More fixes/improvements

* Log the right shard number for wallet/getFreeToken
* Don't try to use empty commit bitmap from genesis block
* Fail hard if block reward or finalization fails
* Log the public key if message sig verification fails
* Extract and save chain state from received blocks in order to
  eliminate deep tail recursion
* Properly deep copy chain config (previously ChainID was being shared
  among chains)
* Eliminate chain use-before-init window in node.New()
* Save genesis epoch shard state in new blockchain database
* Do not check epoch of the received shard state message (temp
  workaround – we should introduce the check elsewhere)
* Propose an empty block if no transactions have been received for 10
  seconds
pull/839/head
Eugene Kim 6 years ago
parent 664c6c1513
commit 7445699909
  1. 2
      cmd/client/wallet/main.go
  2. 10
      cmd/harmony/main.go
  3. 11
      consensus/consensus.go
  4. 10
      consensus/consensus_service.go
  5. 14
      core/blockchain.go
  6. 5
      core/chain_makers.go
  7. 6
      core/state_processor.go
  8. 7
      crypto/bls/bls.go
  9. 2
      internal/configs/node/config.go
  10. 11
      internal/shardchain/shardchains.go
  11. 19
      node/node.go
  12. 44
      node/node_genesis.go
  13. 8
      node/node_handler.go
  14. 72
      node/node_newblock.go
  15. 4
      node/worker/worker.go
  16. 2
      test/deploy.sh

@ -556,7 +556,7 @@ func GetFreeToken(address common.Address) {
log.Debug("GetFreeToken", "response", response) log.Debug("GetFreeToken", "response", response)
txID := common.Hash{} txID := common.Hash{}
txID.SetBytes(response.TxId) txID.SetBytes(response.TxId)
fmt.Printf("Transaction Id requesting free token in shard %d: %s\n", int(0), txID.Hex()) fmt.Printf("Transaction Id requesting free token in shard %d: %s\n", i, txID.Hex())
break break
} }
} }

@ -388,7 +388,15 @@ func main() {
// go currentNode.SupportBeaconSyncing() // go currentNode.SupportBeaconSyncing()
//} //}
utils.GetLogInstance().Info("==== New Harmony Node ====", "BlsPubKey", hex.EncodeToString(nodeConfig.ConsensusPubKey.Serialize()), "ShardID", nodeConfig.ShardID, "ShardGroupID", nodeConfig.GetShardGroupID(), "BeaconGroupID", nodeConfig.GetBeaconGroupID(), "ClientGroupID", nodeConfig.GetClientGroupID(), "Role", currentNode.NodeConfig.Role(), "multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, nodeConfig.Host.GetID().Pretty())) utils.GetLogInstance().Info("==== New Harmony Node ====",
"BlsPubKey", hex.EncodeToString(nodeConfig.ConsensusPubKey.Serialize()),
"ShardID", nodeConfig.ShardID,
"ShardGroupID", nodeConfig.GetShardGroupID(),
"BeaconGroupID", nodeConfig.GetBeaconGroupID(),
"ClientGroupID", nodeConfig.GetClientGroupID(),
"Role", currentNode.NodeConfig.Role(),
"multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s",
*ip, *port, nodeConfig.Host.GetID().Pretty()))
currentNode.MaybeKeepSendingPongMessage() currentNode.MaybeKeepSendingPongMessage()
go currentNode.SupportSyncing() go currentNode.SupportSyncing()

@ -279,16 +279,21 @@ func accumulateRewards(
bc consensus_engine.ChainReader, state *state.DB, header *types.Header, bc consensus_engine.ChainReader, state *state.DB, header *types.Header,
) error { ) error {
logger := header.Logger(utils.GetLogInstance()) logger := header.Logger(utils.GetLogInstance())
getLogger := func() log.Logger { return utils.WithCallerSkip(logger, 1) } getLogger := func() log.Logger { return utils.WithCallerSkip(logger, 1) }
blockNum := header.Number.Uint64() blockNum := header.Number.Uint64()
if blockNum == 0 { if blockNum == 0 {
// Epoch block doesn't have any reward // Epoch block has no parent to reward.
return nil return nil
} }
parentHeader := bc.GetHeaderByNumber(blockNum - 1) parentHeader := bc.GetHeaderByNumber(blockNum - 1)
if parentHeader == nil { if parentHeader == nil {
return ctxerror.New("cannot find parent block header in DB", return ctxerror.New("cannot find parent block header in DB",
"parentBlockNumber", blockNum - 1) "parentBlockNumber", blockNum-1)
}
if parentHeader.Number.Cmp(common.Big0) == 0 {
// Parent is an epoch block,
// which is not signed in the usual manner therefore rewards nothing.
return nil
} }
shardState, err := bc.ReadShardState(parentHeader.Epoch) shardState, err := bc.ReadShardState(parentHeader.Epoch)
if err != nil { if err != nil {

@ -22,6 +22,7 @@ import (
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
bls_cosi "github.com/harmony-one/harmony/crypto/bls" bls_cosi "github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/p2p/host"
@ -242,7 +243,9 @@ func (consensus *Consensus) VerifySeal(chain consensus_engine.ChainReader, heade
func (consensus *Consensus) Finalize(chain consensus_engine.ChainReader, header *types.Header, state *state.DB, txs []*types.Transaction, receipts []*types.Receipt) (*types.Block, error) { func (consensus *Consensus) Finalize(chain consensus_engine.ChainReader, header *types.Header, state *state.DB, txs []*types.Transaction, receipts []*types.Receipt) (*types.Block, error) {
// Accumulate any block and uncle rewards and commit the final state root // Accumulate any block and uncle rewards and commit the final state root
// Header seems complete, assemble into a block and return // Header seems complete, assemble into a block and return
accumulateRewards(chain, state, header) if err := accumulateRewards(chain, state, header); err != nil {
return nil, ctxerror.New("cannot pay block reward").WithCause(err)
}
header.Root = state.IntermediateRoot(false) header.Root = state.IntermediateRoot(false)
return types.NewBlock(header, txs, receipts), nil return types.NewBlock(header, txs, receipts), nil
} }
@ -498,7 +501,10 @@ func (consensus *Consensus) checkConsensusMessage(message *msg_pb.Message, publi
// Verify message signature // Verify message signature
err := verifyMessageSig(publicKey, message) err := verifyMessageSig(publicKey, message)
if err != nil { if err != nil {
utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err) ctxerror.Log15(utils.GetLogger().Warn,
ctxerror.New("failed to verify the message signature",
"publicKey", publicKey.GetHexString(),
).WithCause(err))
return consensus_engine.ErrInvalidConsensusMessage return consensus_engine.ErrInvalidConsensusMessage
} }
if !bytes.Equal(blockHash, consensus.blockHash[:]) { if !bytes.Equal(blockHash, consensus.blockHash[:]) {

@ -1077,6 +1077,20 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types.
func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
n, events, logs, err := bc.insertChain(chain) n, events, logs, err := bc.insertChain(chain)
bc.PostChainEvents(events, logs) bc.PostChainEvents(events, logs)
// TODO ek – make this a post-chain event
if err == nil {
for _, block := range chain {
header := block.Header()
if header.ShardStateHash != (common.Hash{}) {
epoch := new(big.Int).Add(header.Epoch, common.Big1)
err = bc.WriteShardState(epoch, header.ShardState)
if err != nil {
ctxerror.Log15(header.Logger(utils.GetLogger()).Warn,
ctxerror.New("cannot store shard state").WithCause(err))
}
}
}
}
return n, err return n, err
} }

@ -190,7 +190,10 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse
} }
if b.engine != nil { if b.engine != nil {
// Finalize and seal the block // Finalize and seal the block
block, _ := b.engine.Finalize(chainreader, b.header, statedb, b.txs, b.receipts) block, err := b.engine.Finalize(chainreader, b.header, statedb, b.txs, b.receipts)
if err != nil {
panic(err)
}
// Write state changes to db // Write state changes to db
root, err := statedb.Commit(config.IsEIP158(b.header.Number)) root, err := statedb.Commit(config.IsEIP158(b.header.Number))

@ -24,6 +24,7 @@ import (
"github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm" "github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/internal/ctxerror"
) )
// StateProcessor is a basic Processor, which takes care of transitioning // StateProcessor is a basic Processor, which takes care of transitioning
@ -75,7 +76,10 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.DB, cfg vm.C
allLogs = append(allLogs, receipt.Logs...) allLogs = append(allLogs, receipt.Logs...)
} }
// Finalize the block, applying any consensus engine specific extras (e.g. block rewards) // Finalize the block, applying any consensus engine specific extras (e.g. block rewards)
p.engine.Finalize(p.bc, header, statedb, block.Transactions(), receipts) _, err := p.engine.Finalize(p.bc, header, statedb, block.Transactions(), receipts)
if err != nil {
return nil, nil, 0, ctxerror.New("cannot finalize block").WithCause(err)
}
return receipts, allLogs, *usedGas, nil return receipts, allLogs, *usedGas, nil
} }

@ -2,9 +2,10 @@ package bls
import ( import (
"errors" "errors"
"fmt"
"github.com/harmony-one/bls/ffi/go/bls" "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/internal/ctxerror"
) )
func init() { func init() {
@ -84,7 +85,9 @@ func (m *Mask) Len() int {
// cosigners 0-7, bits 0-7 of byte 1 correspond to cosigners 8-15, etc. // cosigners 0-7, bits 0-7 of byte 1 correspond to cosigners 8-15, etc.
func (m *Mask) SetMask(mask []byte) error { func (m *Mask) SetMask(mask []byte) error {
if m.Len() != len(mask) { if m.Len() != len(mask) {
return fmt.Errorf("mismatching Bitmap lengths") return ctxerror.New("mismatching bitmap lengths",
"expectedBitmapLength", m.Len(),
"providedBitmapLength", len(mask))
} }
for i := range m.publics { for i := range m.publics {
byt := i >> 3 byt := i >> 3

@ -78,7 +78,7 @@ type ConfigType struct {
ConsensusPubKey *bls.PublicKey ConsensusPubKey *bls.PublicKey
// Database directory // Database directory
DBDir string DBDir string
SelfPeer p2p.Peer SelfPeer p2p.Peer
Leader p2p.Peer Leader p2p.Peer

@ -1,6 +1,7 @@
package shardchain package shardchain
import ( import (
"math/big"
"sync" "sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -41,9 +42,9 @@ func NewCollection(
) *collection { ) *collection {
return &collection{ return &collection{
dbFactory: dbFactory, dbFactory: dbFactory,
dbInit: dbInit, dbInit: dbInit,
engine: engine, engine: engine,
pool: make(map[uint32]*core.BlockChain), pool: make(map[uint32]*core.BlockChain),
} }
} }
@ -73,7 +74,7 @@ func (sc *collection) ShardChain(shardID uint32) (*core.BlockChain, error) {
"shardID", shardID) "shardID", shardID)
if err := sc.dbInit.InitChainDB(db, shardID); err != nil { if err := sc.dbInit.InitChainDB(db, shardID); err != nil {
return nil, ctxerror.New("cannot initialize a new chain database"). return nil, ctxerror.New("cannot initialize a new chain database").
WithCause(err) WithCause(err)
} }
} }
var cacheConfig *core.CacheConfig var cacheConfig *core.CacheConfig
@ -82,7 +83,7 @@ func (sc *collection) ShardChain(shardID uint32) (*core.BlockChain, error) {
cacheConfig = &core.CacheConfig{Disabled: true} cacheConfig = &core.CacheConfig{Disabled: true}
} }
chainConfig := *params.TestChainConfig chainConfig := *params.TestChainConfig
chainConfig.ChainID.SetUint64(uint64(shardID)) chainConfig.ChainID = big.NewInt(int64(shardID))
bc, err := core.NewBlockChain( bc, err := core.NewBlockChain(
db, cacheConfig, &chainConfig, sc.engine, vm.Config{}, nil, db, cacheConfig, &chainConfig, sc.engine, vm.Config{}, nil,
) )

@ -9,7 +9,6 @@ import (
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/harmony-one/bls/ffi/go/bls" "github.com/harmony-one/bls/ffi/go/bls"
@ -26,7 +25,6 @@ import (
"github.com/harmony-one/harmony/contracts/structs" "github.com/harmony-one/harmony/contracts/structs"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/drand" "github.com/harmony-one/harmony/drand"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node" nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
@ -270,7 +268,6 @@ func (node *Node) GetSyncID() [SyncIDLength]byte {
// New creates a new node. // New creates a new node.
func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardchain.DBFactory, isArchival bool) *Node { func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardchain.DBFactory, isArchival bool) *Node {
var chain *core.BlockChain
var err error var err error
node := Node{} node := Node{}
@ -296,7 +293,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc
node.Consensus = consensusObj node.Consensus = consensusObj
// Load the chains. // Load the chains.
chain = node.Blockchain() // this also sets node.isFirstTime if the DB is fresh chain := node.Blockchain() // this also sets node.isFirstTime if the DB is fresh
_ = node.Beaconchain() _ = node.Beaconchain()
node.BlockChannel = make(chan *types.Block) node.BlockChannel = make(chan *types.Block)
@ -488,17 +485,3 @@ func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) {
return nodeConfig, chanPeer return nodeConfig, chanPeer
} }
// InitBlockChainFromDB retrieves the latest blockchain and state available from the local database
func (node *Node) InitBlockChainFromDB(db ethdb.Database, consensus *consensus.Consensus, isArchival bool) (*core.BlockChain, error) {
chainConfig := params.TestChainConfig
if consensus != nil {
chainConfig.ChainID = big.NewInt(int64(consensus.ShardID)) // Use ChainID as piggybacked ShardID
}
cacheConfig := core.CacheConfig{}
if isArchival {
cacheConfig = core.CacheConfig{Disabled: true, TrieNodeLimit: 256 * 1024 * 1024, TrieTimeLimit: 30 * time.Second}
}
chain, err := core.NewBlockChain(db, &cacheConfig, chainConfig, consensus, vm.Config{}, nil)
return chain, err
}

@ -10,8 +10,12 @@ import (
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/pkg/errors"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/rawdb"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/internal/utils/contract" "github.com/harmony-one/harmony/internal/utils/contract"
) )
@ -32,7 +36,22 @@ type genesisInitializer struct {
// InitChainDB sets up a new genesis block in the database for the given shard. // InitChainDB sets up a new genesis block in the database for the given shard.
func (gi *genesisInitializer) InitChainDB(db ethdb.Database, shardID uint32) error { func (gi *genesisInitializer) InitChainDB(db ethdb.Database, shardID uint32) error {
return gi.node.SetupGenesisBlock(db, shardID) shardState := core.GetInitShardState()
if shardID != 0 {
// store only the local shard
if c := shardState.FindCommitteeByID(shardID); c == nil {
return errors.New("cannot find local shard in genesis")
} else {
shardState = types.ShardState{*c}
}
}
if err := rawdb.WriteShardState(db, common.Big0, shardState); err != nil {
return ctxerror.New("cannot store epoch shard state").WithCause(err)
}
if err := gi.node.SetupGenesisBlock(db, shardID); err != nil {
return ctxerror.New("cannot setup genesis block").WithCause(err)
}
return nil
} }
// GenesisBlockSetup setups a genesis blockchain. // GenesisBlockSetup setups a genesis blockchain.
@ -78,25 +97,25 @@ func (node *Node) SetupGenesisBlock(db ethdb.Database, shardID uint32) error {
// Store genesis block into db. // Store genesis block into db.
_, err := gspec.Commit(db) _, err := gspec.Commit(db)
return err return err
} }
// CreateTestBankKeys deterministically generates testing addresses. // CreateTestBankKeys deterministically generates testing addresses.
func CreateTestBankKeys(numAddresses int) (keys []*ecdsa.PrivateKey, err error) { func CreateTestBankKeys(numAddresses int) (keys []*ecdsa.PrivateKey, err error) {
rand.Seed(0) rand.Seed(0)
bytes := make([]byte, 1000000) bytes := make([]byte, 1000000)
for i := range bytes { for i := range bytes {
bytes[i] = byte(rand.Intn(100)) bytes[i] = byte(rand.Intn(100))
} }
reader := strings.NewReader(string(bytes)) reader := strings.NewReader(string(bytes))
for i := 0; i < numAddresses; i++ { for i := 0; i < numAddresses; i++ {
key, err := ecdsa.GenerateKey(crypto.S256(), reader) key, err := ecdsa.GenerateKey(crypto.S256(), reader)
if err != nil { if err != nil {
return nil, err return nil, err
} }
keys = append(keys, key) keys = append(keys, key)
} }
return keys, nil return keys, nil
} }
// CreateGenesisAllocWithTestingAddresses create the genesis block allocation that contains deterministically // CreateGenesisAllocWithTestingAddresses create the genesis block allocation that contains deterministically
@ -113,7 +132,6 @@ func (node *Node) CreateGenesisAllocWithTestingAddresses(numAddress int) core.Ge
return genesisAloc return genesisAloc
} }
// AddNodeAddressesToGenesisAlloc adds to the genesis block allocation the accounts used for network validators/nodes, // AddNodeAddressesToGenesisAlloc adds to the genesis block allocation the accounts used for network validators/nodes,
// including the account used by the nodes of the initial beacon chain and later new nodes. // including the account used by the nodes of the initial beacon chain and later new nodes.
func AddNodeAddressesToGenesisAlloc(genesisAlloc core.GenesisAlloc) { func AddNodeAddressesToGenesisAlloc(genesisAlloc core.GenesisAlloc) {

@ -673,15 +673,7 @@ func (node *Node) epochShardStateMessageHandler(msgPayload []byte) error {
if node.Consensus == nil && node.NodeConfig.Role() != nodeconfig.NewNode { if node.Consensus == nil && node.NodeConfig.Role() != nodeconfig.NewNode {
return nil return nil
} }
// Remember the master sharding state if the epoch ID matches.
curEpoch := node.Blockchain().CurrentBlock().Header().Epoch
expectedEpoch := new(big.Int).Add(curEpoch, common.Big1)
receivedEpoch := big.NewInt(int64(epochShardState.Epoch)) receivedEpoch := big.NewInt(int64(epochShardState.Epoch))
if receivedEpoch.Cmp(expectedEpoch) != 0 {
return ctxerror.New("invalid epoch in epoch shard state message",
"receivedEpoch", receivedEpoch,
"expectedEpoch", expectedEpoch)
}
getLogger().Info("received new shard state", "epoch", receivedEpoch) getLogger().Info("received new shard state", "epoch", receivedEpoch)
node.nextShardState.master = epochShardState node.nextShardState.master = epochShardState
if node.Consensus.IsLeader { if node.Consensus.IsLeader {

@ -9,6 +9,7 @@ import (
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
) )
@ -17,7 +18,7 @@ const (
DefaultThreshold = 1 DefaultThreshold = 1
FirstTimeThreshold = 2 FirstTimeThreshold = 2
ConsensusTimeOut = 10 ConsensusTimeOut = 10
PeriodicBlock = 3 * time.Second PeriodicBlock = 1 * time.Second
) )
// WaitForConsensusReady listen for the readiness signal from consensus and generate new block for consensus. // WaitForConsensusReady listen for the readiness signal from consensus and generate new block for consensus.
@ -32,6 +33,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}, stopChan chan
firstTime := true firstTime := true
var newBlock *types.Block var newBlock *types.Block
timeoutCount := 0 timeoutCount := 0
deadline := time.Now().Add(10 * time.Second)
for { for {
// keep waiting for Consensus ready // keep waiting for Consensus ready
select { select {
@ -55,21 +57,30 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}, stopChan chan
threshold = FirstTimeThreshold threshold = FirstTimeThreshold
firstTime = false firstTime = false
} }
if len(node.pendingTransactions) >= threshold { if len(node.pendingTransactions) >= threshold || !time.Now().Before(deadline) {
deadline = time.Now().Add(10 * time.Second)
utils.GetLogInstance().Debug("PROPOSING NEW BLOCK ------------------------------------------------", "blockNum", node.Blockchain().CurrentBlock().NumberU64()+1, "threshold", threshold, "pendingTransactions", len(node.pendingTransactions)) utils.GetLogInstance().Debug("PROPOSING NEW BLOCK ------------------------------------------------", "blockNum", node.Blockchain().CurrentBlock().NumberU64()+1, "threshold", threshold, "pendingTransactions", len(node.pendingTransactions))
// Normal tx block consensus // Normal tx block consensus
selectedTxs := node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock) selectedTxs := node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock)
if len(selectedTxs) != 0 { if err := node.Worker.CommitTransactions(selectedTxs); err != nil {
node.Worker.CommitTransactions(selectedTxs) ctxerror.Log15(utils.GetLogger().Error,
block, err := node.Worker.Commit() ctxerror.New("cannot commit transacttions").
if err != nil { WithCause(err))
utils.GetLogInstance().Debug("Failed committing new block", "Error", err) }
} else { block, err := node.Worker.Commit()
node.proposeShardState(block) if err != nil {
newBlock = block ctxerror.Log15(utils.GetLogInstance().Error,
utils.GetLogInstance().Debug("Successfully proposed new block", "blockNum", block.NumberU64(), "numTxs", block.Transactions().Len()) ctxerror.New("Failed committing new block").
break WithCause(err))
} } else if err := node.proposeShardState(block); err != nil {
ctxerror.Log15(utils.GetLogger().Error,
ctxerror.New("cannot add shard state").
WithCause(err))
} else {
newBlock = block
utils.GetLogInstance().Debug("Successfully proposed new block", "blockNum", block.NumberU64(), "numTxs", block.Transactions().Len())
threshold = DefaultThreshold
break
} }
} }
// If not enough transactions to run Consensus, // If not enough transactions to run Consensus,
@ -106,6 +117,7 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch
case <-readySignal: case <-readySignal:
firstTry := true firstTry := true
deadline := time.Now().Add(10 * time.Second)
for { for {
if !firstTry { if !firstTry {
time.Sleep(PeriodicBlock) time.Sleep(PeriodicBlock)
@ -118,28 +130,36 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch
threshold = FirstTimeThreshold threshold = FirstTimeThreshold
firstTime = false firstTime = false
} }
if len(node.pendingTransactions) < threshold { if len(node.pendingTransactions) < threshold && time.Now().Before(deadline) {
continue continue
} }
deadline = time.Now().Add(10 * time.Second)
// Normal tx block consensus // Normal tx block consensus
selectedTxs := node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock) selectedTxs := node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock)
if len(selectedTxs) == 0 {
continue
}
utils.GetLogInstance().Debug("PROPOSING NEW BLOCK ------------------------------------------------", "blockNum", node.Blockchain().CurrentBlock().NumberU64()+1, "threshold", threshold, "selectedTxs", len(selectedTxs)) utils.GetLogInstance().Debug("PROPOSING NEW BLOCK ------------------------------------------------", "blockNum", node.Blockchain().CurrentBlock().NumberU64()+1, "threshold", threshold, "selectedTxs", len(selectedTxs))
node.Worker.CommitTransactions(selectedTxs) if err := node.Worker.CommitTransactions(selectedTxs); err != nil {
ctxerror.Log15(utils.GetLogger().Error,
ctxerror.New("cannot commit transactions").
WithCause(err))
}
block, err := node.Worker.Commit() block, err := node.Worker.Commit()
if err != nil { if err != nil {
utils.GetLogInstance().Debug("Failed committing new block", "Error", err) ctxerror.Log15(utils.GetLogger().Error,
ctxerror.New("cannot commit new block").
WithCause(err))
continue continue
} else if err := node.proposeShardState(block); err != nil {
ctxerror.Log15(utils.GetLogger().Error,
ctxerror.New("cannot add shard state").
WithCause(err))
} else {
newBlock := block
utils.GetLogInstance().Debug("Successfully proposed new block", "blockNum", block.NumberU64(), "numTxs", block.Transactions().Len())
// Send the new block to Consensus so it can be confirmed.
node.BlockChannel <- newBlock
break
} }
node.proposeShardState(block)
newBlock := block
utils.GetLogInstance().Debug("Successfully proposed new block", "blockNum", block.NumberU64(), "numTxs", block.Transactions().Len())
// Send the new block to Consensus so it can be confirmed.
node.BlockChannel <- newBlock
break
} }
} }
} }

@ -8,11 +8,13 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
consensus_engine "github.com/harmony-one/harmony/consensus/engine" consensus_engine "github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm" "github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/internal/ctxerror"
) )
// environment is the worker's current environment and holds all of the current state information. // environment is the worker's current environment and holds all of the current state information.
@ -157,7 +159,7 @@ func (w *Worker) Commit() (*types.Block, error) {
s := w.current.state.Copy() s := w.current.state.Copy()
block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, w.current.receipts) block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, w.current.receipts)
if err != nil { if err != nil {
return nil, err return nil, ctxerror.New("cannot finalize block").WithCause(err)
} }
return block, nil return block, nil
} }

@ -168,7 +168,7 @@ if [ "$TXGEN" == "true" ]; then
line=$(grep client $config) line=$(grep client $config)
IFS=' ' read ip port mode shardID <<< $line IFS=' ' read ip port mode shardID <<< $line
if [ "$mode" == "client" ]; then if [ "$mode" == "client" ]; then
$DRYRUN $ROOT/bin/txgen -log_folder $log_folder -duration $DURATION -ip $ip -port $port $HMY_OPT2 > $LOG_FILE 2>&1 $DRYRUN $ROOT/bin/txgen -log_folder $log_folder -duration $DURATION -ip $ip -port $port -bootnodes "${BN_MA}" > $LOG_FILE 2>&1
fi fi
else else
sleep $DURATION sleep $DURATION

Loading…
Cancel
Save