diff --git a/api/proto/node/node_test.go b/api/proto/node/node_test.go index 2fced72c7..f5bbb81ac 100644 --- a/api/proto/node/node_test.go +++ b/api/proto/node/node_test.go @@ -75,6 +75,7 @@ func TestConstructBlocksSyncMessage(t *testing.T) { head := &types.Header{ Number: new(big.Int).SetUint64(uint64(10000)), Nonce: types.EncodeNonce(uint64(10000)), + Epoch: big.NewInt(0), ShardID: 0, Time: new(big.Int).SetUint64(uint64(100000)), Root: root, diff --git a/cmd/client/txgen/main.go b/cmd/client/txgen/main.go index 1caae9ca0..797fafd9d 100644 --- a/cmd/client/txgen/main.go +++ b/cmd/client/txgen/main.go @@ -13,11 +13,13 @@ import ( "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/internal/shardchain" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" bls2 "github.com/harmony-one/bls/ffi/go/bls" + "github.com/harmony-one/harmony/api/client" proto_node "github.com/harmony-one/harmony/api/proto/node" "github.com/harmony-one/harmony/core/types" @@ -97,7 +99,8 @@ func setUpTXGen() *node.Node { os.Exit(1) } consensusObj, err := consensus.New(myhost, uint32(shardID), p2p.Peer{}, nil) - txGen := node.New(myhost, consensusObj, nil, false) //Changed it : no longer archival node. + chainDBFactory := &shardchain.MemDBFactory{} + txGen := node.New(myhost, consensusObj, chainDBFactory, false) //Changed it : no longer archival node. txGen.Client = client.NewClient(txGen.GetHost(), uint32(shardID)) consensusObj.SetStakeInfoFinder(gsif) consensusObj.ChainReader = txGen.Blockchain() diff --git a/cmd/client/wallet/main.go b/cmd/client/wallet/main.go index 8ca53f3cc..e556318fd 100644 --- a/cmd/client/wallet/main.go +++ b/cmd/client/wallet/main.go @@ -15,12 +15,14 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" + "github.com/harmony-one/harmony/api/client" clientService "github.com/harmony-one/harmony/api/client/service" proto_node "github.com/harmony-one/harmony/api/proto/node" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/internal/shardchain" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/p2p" @@ -239,7 +241,8 @@ func createWalletNode() *node.Node { if err != nil { panic(err) } - w := node.New(host, nil, nil, false) + chainDBFactory := &shardchain.MemDBFactory{} + w := node.New(host, nil, chainDBFactory, false) w.Client = client.NewClient(w.GetHost(), uint32(shardID)) w.NodeConfig.SetRole(nodeconfig.ClientNode) @@ -565,7 +568,7 @@ func GetFreeToken(address common.Address) { log.Debug("GetFreeToken", "response", response) txID := common.Hash{} txID.SetBytes(response.TxId) - fmt.Printf("Transaction Id requesting free token in shard %d: %s\n", int(0), txID.Hex()) + fmt.Printf("Transaction Id requesting free token in shard %d: %s\n", i, txID.Hex()) break } } diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 99cfecaaf..183316021 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -22,6 +22,7 @@ import ( nodeconfig "github.com/harmony-one/harmony/internal/configs/node" hmykey "github.com/harmony-one/harmony/internal/keystore" "github.com/harmony-one/harmony/internal/profiler" + "github.com/harmony-one/harmony/internal/shardchain" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils/contract" "github.com/harmony-one/harmony/node" @@ -109,6 +110,9 @@ var ( myPass = "" // logging verbosity verbosity = flag.Int("verbosity", 5, "Logging verbosity: 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=detail (default: 5)") + + // dbDir is the database directory. + dbDir = flag.String("db_dir", "", "blockchain database directory") ) func initSetup() { @@ -229,16 +233,6 @@ func createGlobalConfig() *nodeconfig.ConfigType { } // Key Setup ================= [End] - // Initialize leveldb for main blockchain and beacon. - if nodeConfig.MainDB, err = InitLDBDatabase(*ip, *port, *freshDB, false); err != nil { - panic(err) - } - if myShardID != 0 { - if nodeConfig.BeaconDB, err = InitLDBDatabase(*ip, *port, *freshDB, true); err != nil { - panic(err) - } - } - nodeConfig.SelfPeer = p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: nodeConfig.ConsensusPubKey} if *accountIndex%core.GenesisShardSize == 0 { // The first node in a shard is the leader at genesis @@ -258,6 +252,8 @@ func createGlobalConfig() *nodeconfig.ConfigType { nodeConfig.Host.AddPeer(&nodeConfig.Leader) + nodeConfig.DBDir = *dbDir + return nodeConfig } @@ -273,7 +269,8 @@ func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { currentConsensus.MinPeers = *minPeers // Current node. - currentNode := node.New(nodeConfig.Host, currentConsensus, nodeConfig.MainDB, *isArchival) + chainDBFactory := &shardchain.LDBFactory{RootDir: nodeConfig.DBDir} + currentNode := node.New(nodeConfig.Host, currentConsensus, chainDBFactory, *isArchival) currentNode.NodeConfig.SetRole(nodeconfig.NewNode) currentNode.StakingAccount = myAccount utils.GetLogInstance().Info("node account set", @@ -348,10 +345,6 @@ func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { currentNode.Consensus.RegisterRndChannel(dRand.RndChannel) currentNode.DRand = dRand - if currentConsensus.ShardID != 0 { - currentNode.AddBeaconChainDatabase(nodeConfig.BeaconDB) - } - // This needs to be executed after consensus and drand are setup if !*isNewNode || *shardID > -1 { // initial staking new node doesn't need to initialize shard state currentNode.InitShardState(*shardID == -1 && !*isNewNode) // TODO: Have a better why to distinguish non-genesis node @@ -395,7 +388,15 @@ func main() { // go currentNode.SupportBeaconSyncing() //} - utils.GetLogInstance().Info("==== New Harmony Node ====", "BlsPubKey", hex.EncodeToString(nodeConfig.ConsensusPubKey.Serialize()), "ShardID", nodeConfig.ShardID, "ShardGroupID", nodeConfig.GetShardGroupID(), "BeaconGroupID", nodeConfig.GetBeaconGroupID(), "ClientGroupID", nodeConfig.GetClientGroupID(), "Role", currentNode.NodeConfig.Role(), "multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", *ip, *port, nodeConfig.Host.GetID().Pretty())) + utils.GetLogInstance().Info("==== New Harmony Node ====", + "BlsPubKey", hex.EncodeToString(nodeConfig.ConsensusPubKey.Serialize()), + "ShardID", nodeConfig.ShardID, + "ShardGroupID", nodeConfig.GetShardGroupID(), + "BeaconGroupID", nodeConfig.GetBeaconGroupID(), + "ClientGroupID", nodeConfig.GetClientGroupID(), + "Role", currentNode.NodeConfig.Role(), + "multiaddress", fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", + *ip, *port, nodeConfig.Host.GetID().Pretty())) currentNode.MaybeKeepSendingPongMessage() go currentNode.SupportSyncing() diff --git a/consensus/consensus.go b/consensus/consensus.go index 0ccda6d8d..05b96ab70 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -2,12 +2,12 @@ package consensus // consensus import ( - "errors" "math/big" "sync" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" "github.com/harmony-one/bls/ffi/go/bls" @@ -125,7 +125,7 @@ type Consensus struct { // Called when consensus on a new block is done OnConsensusDone func(*types.Block) // The verifier func passed from Node object - BlockVerifier func(*types.Block) bool + BlockVerifier func(*types.Block) error // verified block to state sync broadcast VerifiedNewBlock chan *types.Block @@ -246,61 +246,82 @@ func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKe return &consensus, nil } -// AccumulateRewards credits the coinbase of the given block with the mining +// accumulateRewards credits the coinbase of the given block with the mining // reward. The total reward consists of the static block reward and rewards for // included uncles. The coinbase of each uncle block is also rewarded. -func (consensus *Consensus) accumulateRewards( - config *params.ChainConfig, state *state.DB, header *types.Header, +func accumulateRewards( + bc consensus_engine.ChainReader, state *state.DB, header *types.Header, ) error { - logger := utils.GetLogInstance().New("parentHash", header.ParentHash) - if header.ParentHash == (common.Hash{}) { - // This block is a genesis block, - // without a parent block whose signer to reward. + logger := header.Logger(utils.GetLogInstance()) + getLogger := func() log.Logger { return utils.WithCallerSkip(logger, 1) } + blockNum := header.Number.Uint64() + if blockNum == 0 { + // Epoch block has no parent to reward. return nil } - if consensus.ChainReader == nil { - return errors.New("ChainReader is nil") + // TODO ek – retrieving by parent number (blockNum - 1) doesn't work, + // while it is okay with hash. Sounds like DB inconsistency. + // Figure out why. + parentHeader := bc.GetHeaderByHash(header.ParentHash) + if parentHeader == nil { + return ctxerror.New("cannot find parent block header in DB", + "parentHash", header.ParentHash) } - parent := consensus.ChainReader.GetHeaderByHash(header.ParentHash) - if parent == nil { - return ctxerror.New("cannot retrieve parent header", - "parentHash", header.ParentHash, + if parentHeader.Number.Cmp(common.Big0) == 0 { + // Parent is an epoch block, + // which is not signed in the usual manner therefore rewards nothing. + return nil + } + parentShardState, err := bc.ReadShardState(parentHeader.Epoch) + if err != nil { + return ctxerror.New("cannot read shard state", + "epoch", parentHeader.Epoch, + ).WithCause(err) + } + parentCommittee := parentShardState.FindCommitteeByID(parentHeader.ShardID) + if parentCommittee == nil { + return ctxerror.New("cannot find shard in the shard state", + "parentBlockNumber", parentHeader.Number, + "shardID", parentHeader.ShardID, ) } - mask, err := bls_cosi.NewMask(consensus.PublicKeys, nil) + var committerKeys []*bls.PublicKey + for _, member := range parentCommittee.NodeList { + committerKey := new(bls.PublicKey) + err := member.BlsPublicKey.ToLibBLSPublicKey(committerKey) + if err != nil { + return ctxerror.New("cannot convert BLS public key", + "blsPublicKey", member.BlsPublicKey).WithCause(err) + } + committerKeys = append(committerKeys, committerKey) + } + mask, err := bls_cosi.NewMask(committerKeys, nil) if err != nil { return ctxerror.New("cannot create group sig mask").WithCause(err) } - logger.Debug("accumulateRewards: setting group sig mask", - "destLen", mask.Len(), - "srcLen", len(parent.CommitBitmap), - ) - if err := mask.SetMask(parent.CommitBitmap); err != nil { + if err := mask.SetMask(parentHeader.CommitBitmap); err != nil { return ctxerror.New("cannot set group sig mask bits").WithCause(err) } totalAmount := big.NewInt(0) numAccounts := 0 - signingKeys := mask.GetPubKeyFromMask(true) - for _, key := range signingKeys { - stakeInfos := consensus.stakeInfoFinder.FindStakeInfoByNodeKey(key) - if len(stakeInfos) == 0 { - logger.Error("accumulateRewards: node has no stake info", - "nodeKey", key.GetHexString()) + for idx, member := range parentCommittee.NodeList { + if signed, err := mask.IndexEnabled(idx); err != nil { + return ctxerror.New("cannot check for committer bit", + "committerIndex", idx, + ).WithCause(err) + } else if !signed { continue } - numAccounts += len(stakeInfos) - for _, stakeInfo := range stakeInfos { - utils.GetLogInstance().Info("accumulateRewards: rewarding", - "block", header.Hash(), - "account", stakeInfo.Account, - "node", stakeInfo.BlsPublicKey.Hex(), - "amount", BlockReward) - state.AddBalance(stakeInfo.Account, BlockReward) - totalAmount = new(big.Int).Add(totalAmount, BlockReward) - } + numAccounts++ + account := common.HexToAddress(member.EcdsaAddress) + getLogger().Info("rewarding block signer", + "account", account, + "node", member.BlsPublicKey.Hex(), + "amount", BlockReward) + state.AddBalance(account, BlockReward) + totalAmount = new(big.Int).Add(totalAmount, BlockReward) } - logger.Debug("accumulateRewards: paid out block reward", - "numSigs", len(signingKeys), + getLogger().Debug("paid out block reward", "numAccounts", numAccounts, "totalAmount", totalAmount) return nil diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 90dce4845..416f3f7ed 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -9,9 +9,9 @@ import ( "github.com/ethereum/go-ethereum/rlp" protobuf "github.com/golang/protobuf/proto" "github.com/harmony-one/bls/ffi/go/bls" + msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/api/service/explorer" - "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" bls_cosi "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/internal/profiler" @@ -51,20 +51,21 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block, stop // TODO: think about potential race condition if consensus.ShardID == 0 { - if core.IsEpochBlock(newBlock) { // Only beacon chain do randomness generation - // Receive pRnd from DRG protocol - utils.GetLogInstance().Debug("[DRG] Waiting for pRnd") - pRndAndBitmap := <-consensus.PRndChannel - utils.GetLogInstance().Debug("[DRG] Got pRnd", "pRnd", pRndAndBitmap) - pRnd := [32]byte{} - copy(pRnd[:], pRndAndBitmap[:32]) - bitmap := pRndAndBitmap[32:] - vrfBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.ConsensusPubKey) - vrfBitmap.SetMask(bitmap) - - // TODO: check validity of pRnd - newBlock.AddRandPreimage(pRnd) - } + // TODO ek/rj - re-enable this after fixing DRand + //if core.IsEpochBlock(newBlock) { // Only beacon chain do randomness generation + // // Receive pRnd from DRG protocol + // utils.GetLogInstance().Debug("[DRG] Waiting for pRnd") + // pRndAndBitmap := <-consensus.PRndChannel + // utils.GetLogInstance().Debug("[DRG] Got pRnd", "pRnd", pRndAndBitmap) + // pRnd := [32]byte{} + // copy(pRnd[:], pRndAndBitmap[:32]) + // bitmap := pRndAndBitmap[32:] + // vrfBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.ConsensusPubKey) + // vrfBitmap.SetMask(bitmap) + // + // // TODO: check validity of pRnd + // newBlock.AddRandPreimage(pRnd) + //} rnd, blockHash, err := consensus.GetNextRnd() if err == nil { diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index 64f692bc6..92926e1a6 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -12,6 +12,9 @@ import ( "github.com/ethereum/go-ethereum/rlp" protobuf "github.com/golang/protobuf/proto" "github.com/harmony-one/bls/ffi/go/bls" + libp2p_peer "github.com/libp2p/go-libp2p-peer" + "golang.org/x/crypto/sha3" + proto_discovery "github.com/harmony-one/harmony/api/proto/discovery" msg_pb "github.com/harmony-one/harmony/api/proto/message" consensus_engine "github.com/harmony-one/harmony/consensus/engine" @@ -19,11 +22,10 @@ import ( "github.com/harmony-one/harmony/core/types" bls_cosi "github.com/harmony-one/harmony/crypto/bls" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" - libp2p_peer "github.com/libp2p/go-libp2p-peer" - "golang.org/x/crypto/sha3" ) // WaitForNewRandomness listens to the RndChannel to receive new VDF randomness. @@ -244,7 +246,9 @@ func (consensus *Consensus) VerifySeal(chain consensus_engine.ChainReader, heade func (consensus *Consensus) Finalize(chain consensus_engine.ChainReader, header *types.Header, state *state.DB, txs []*types.Transaction, receipts []*types.Receipt) (*types.Block, error) { // Accumulate any block and uncle rewards and commit the final state root // Header seems complete, assemble into a block and return - consensus.accumulateRewards(chain.Config(), state, header) + if err := accumulateRewards(chain, state, header); err != nil { + return nil, ctxerror.New("cannot pay block reward").WithCause(err) + } header.Root = state.IntermediateRoot(false) return types.NewBlock(header, txs, receipts), nil } @@ -533,7 +537,10 @@ func (consensus *Consensus) checkConsensusMessage(message *msg_pb.Message, publi // Verify message signature err := verifyMessageSig(publicKey, message) if err != nil { - utils.GetLogInstance().Warn("Failed to verify the message signature", "Error", err) + ctxerror.Log15(utils.GetLogger().Warn, + ctxerror.New("failed to verify the message signature", + "publicKey", publicKey.GetHexString(), + ).WithCause(err)) return consensus_engine.ErrInvalidConsensusMessage } if !bytes.Equal(blockHash, consensus.blockHash[:]) { diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index eed31bfad..b5a0d86d8 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -8,13 +8,14 @@ import ( protobuf "github.com/golang/protobuf/proto" "github.com/harmony-one/bls/ffi/go/bls" + "github.com/harmony-one/harmony/api/proto" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/api/service/explorer" - "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" bls_cosi "github.com/harmony-one/harmony/crypto/bls" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" @@ -140,6 +141,16 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { utils.GetLogInstance().Warn("onAnnounce block content is not verified successfully", "error", err) return } + if consensus.BlockVerifier == nil { + // do nothing + } else if err := consensus.BlockVerifier(&blockObj); err != nil { + // TODO ek – maybe we could do this in commit phase + err := ctxerror.New("block verification failed", + "blockHash", blockObj.Hash(), + ).WithCause(err) + ctxerror.Log15(utils.GetLogInstance().Warn, err) + return + } logMsgs := consensus.pbftLog.GetMessagesByTypeSeqView(msg_pb.MessageType_ANNOUNCE, recvMsg.BlockNum, recvMsg.ViewID) if len(logMsgs) > 0 { @@ -697,20 +708,21 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan case newBlock := <-blockChannel: utils.GetLogInstance().Info("receive newBlock", "blockNum", newBlock.NumberU64()) if consensus.ShardID == 0 { - if core.IsEpochBlock(newBlock) { // Only beacon chain do randomness generation - // Receive pRnd from DRG protocol - utils.GetLogInstance().Debug("[DRG] Waiting for pRnd") - pRndAndBitmap := <-consensus.PRndChannel - utils.GetLogInstance().Debug("[DRG] Got pRnd", "pRnd", pRndAndBitmap) - pRnd := [32]byte{} - copy(pRnd[:], pRndAndBitmap[:32]) - bitmap := pRndAndBitmap[32:] - vrfBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.ConsensusPubKey) - vrfBitmap.SetMask(bitmap) - - // TODO: check validity of pRnd - newBlock.AddRandPreimage(pRnd) - } + // TODO ek/rj - re-enable this after fixing DRand + //if core.IsEpochBlock(newBlock) { // Only beacon chain do randomness generation + // // Receive pRnd from DRG protocol + // utils.GetLogInstance().Debug("[DRG] Waiting for pRnd") + // pRndAndBitmap := <-consensus.PRndChannel + // utils.GetLogInstance().Debug("[DRG] Got pRnd", "pRnd", pRndAndBitmap) + // pRnd := [32]byte{} + // copy(pRnd[:], pRndAndBitmap[:32]) + // bitmap := pRndAndBitmap[32:] + // vrfBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.ConsensusPubKey) + // vrfBitmap.SetMask(bitmap) + // + // // TODO: check validity of pRnd + // newBlock.AddRandPreimage(pRnd) + //} rnd, blockHash, err := consensus.GetNextRnd() if err == nil { diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 91a284f1a..6d0fde614 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -9,6 +9,7 @@ import ( "github.com/harmony-one/harmony/core/types" bls_cosi "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/internal/attack" + "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" @@ -82,6 +83,16 @@ func (consensus *Consensus) processAnnounceMessage(message *msg_pb.Message) { utils.GetLogInstance().Warn("Block content is not verified successfully", "error", err) return } + if consensus.BlockVerifier == nil { + // do nothing + } else if err := consensus.BlockVerifier(&blockObj); err != nil { + // TODO ek – maybe we could do this in commit phase + err := ctxerror.New("block verification failed", + "blockHash", blockObj.Hash(), + ).WithCause(err) + ctxerror.Log15(utils.GetLogInstance().Warn, err) + return + } // Construct and send prepare message msgToSend := consensus.constructPrepareMessage() diff --git a/consensus/consensus_validator_test.go b/consensus/consensus_validator_test.go index 29353a05b..81f463728 100644 --- a/consensus/consensus_validator_test.go +++ b/consensus/consensus_validator_test.go @@ -2,14 +2,15 @@ package consensus import ( "encoding/hex" + "math/big" "testing" "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/params" - "github.com/golang/mock/gomock" protobuf "github.com/golang/protobuf/proto" + "github.com/harmony-one/harmony/api/proto" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/core/types" @@ -18,7 +19,6 @@ import ( "github.com/harmony-one/harmony/p2p" mock_host "github.com/harmony-one/harmony/p2p/host/mock" "github.com/harmony-one/harmony/p2p/p2pimpl" - //"github.com/stretchr/testify/assert" ) type MockChainReader struct{} @@ -48,6 +48,10 @@ func (MockChainReader) GetBlock(hash common.Hash, number uint64) *types.Block { return &types.Block{} } +func (MockChainReader) ReadShardState(epoch *big.Int) (types.ShardState, error) { + return nil, nil +} + func TestProcessMessageValidatorAnnounce(test *testing.T) { ctrl := gomock.NewController(test) defer ctrl.Finish() @@ -118,10 +122,10 @@ func TestProcessMessageValidatorAnnounce(test *testing.T) { func testBlockBytes() ([]byte, error) { return hex.DecodeString("" + - // BEGIN 673-byte Block - "f902a1" + - // BEGIN 668-byte header - "f9029c" + + // BEGIN 675-byte Block + "f902a3" + + // BEGIN 670-byte header + "f9029e" + // 32-byte ParentHash "a0" + "0000000000000000000000000000000000000000000000000000000000000000" + @@ -171,6 +175,9 @@ func testBlockBytes() ([]byte, error) { // 8-byte Nonce "88" + "0000000000000000" + + // 0-byte Epoch + "80" + + "" + // single-byte ShardID "01" + // 48-byte PrepareSignature @@ -196,6 +203,9 @@ func testBlockBytes() ([]byte, error) { // 32-byte ShardStateHash "a0" + "0000000000000000000000000000000000000000000000000000000000000000" + + // BEGIN 0-byte ShardState + "c0" + + // END ShardState // END header // BEGIN 0-byte uncles "c0" + diff --git a/consensus/engine/consensus_engine.go b/consensus/engine/consensus_engine.go index 98b26cf0d..bbbf46d4b 100644 --- a/consensus/engine/consensus_engine.go +++ b/consensus/engine/consensus_engine.go @@ -1,8 +1,11 @@ package engine import ( + "math/big" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/params" + "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" ) @@ -28,6 +31,9 @@ type ChainReader interface { // GetBlock retrieves a block from the database by hash and number. GetBlock(hash common.Hash, number uint64) *types.Block + + // ReadShardState retrieves sharding state given the epoch number. + ReadShardState(epoch *big.Int) (types.ShardState, error) } // Engine is an algorithm agnostic consensus engine. diff --git a/contracts/contract_caller.go b/contracts/contract_caller.go index 5f27f922b..fc147b7d1 100644 --- a/contracts/contract_caller.go +++ b/contracts/contract_caller.go @@ -6,8 +6,8 @@ import ( "github.com/harmony-one/harmony/internal/utils" "github.com/ethereum/go-ethereum/common/math" - "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/params" + "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/vm" @@ -15,7 +15,6 @@ import ( // ContractCaller is used to call smart contract locally type ContractCaller struct { - database *ethdb.Database blockchain *core.BlockChain // Ethereum blockchain to handle the consensus mu sync.Mutex @@ -23,9 +22,8 @@ type ContractCaller struct { } // NewContractCaller initialize a new contract caller. -func NewContractCaller(db *ethdb.Database, bc *core.BlockChain, config *params.ChainConfig) *ContractCaller { +func NewContractCaller(bc *core.BlockChain, config *params.ChainConfig) *ContractCaller { cc := ContractCaller{} - cc.database = db cc.blockchain = bc cc.mu = sync.Mutex{} cc.config = config diff --git a/core/blockchain.go b/core/blockchain.go index 5f1ce92cd..6789c4c58 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -27,9 +27,6 @@ import ( "sync/atomic" "time" - "github.com/harmony-one/harmony/contracts/structs" - lru "github.com/hashicorp/golang-lru" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/common/prque" @@ -41,11 +38,15 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/trie" + lru "github.com/hashicorp/golang-lru" + consensus_engine "github.com/harmony-one/harmony/consensus/engine" + "github.com/harmony-one/harmony/contracts/structs" "github.com/harmony-one/harmony/core/rawdb" "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/vm" + "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" ) @@ -66,11 +67,15 @@ const ( badBlockLimit = 10 triesInMemory = 128 shardCacheLimit = 2 + epochCacheLimit = 10 // BlocksPerEpoch is the number of blocks in one epoch // currently set to small number for testing // in future, this need to be adjusted dynamically instead of constant - BlocksPerEpoch = 10000 + // TODO ek – inflate to disable resharding until we can 1) fix shard + // state mutation bug and 2) implement key passphrase recycle across + // process restart (exec) for shard migration + BlocksPerEpoch = 1000000000000 // BlockChainVersion ensures that an incompatible database forces a resync from scratch. BlockChainVersion = 3 @@ -130,6 +135,7 @@ type BlockChain struct { blockCache *lru.Cache // Cache for the most recent entire blocks futureBlocks *lru.Cache // future blocks are blocks added for later processing shardStateCache *lru.Cache + epochCache *lru.Cache // Cache epoch number → first block number quit chan struct{} // blockchain quit channel running int32 // running must be called atomically @@ -163,6 +169,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par futureBlocks, _ := lru.New(maxFutureBlocks) badBlocks, _ := lru.New(badBlockLimit) shardCache, _ := lru.New(shardCacheLimit) + epochCache, _ := lru.New(epochCacheLimit) bc := &BlockChain{ chainConfig: chainConfig, @@ -178,6 +185,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, chainConfig *par blockCache: blockCache, futureBlocks: futureBlocks, shardStateCache: shardCache, + epochCache: epochCache, engine: engine, vmConfig: vmConfig, badBlocks: badBlocks, @@ -1072,6 +1080,23 @@ func (bc *BlockChain) WriteBlockWithState(block *types.Block, receipts []*types. func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { n, events, logs, err := bc.insertChain(chain) bc.PostChainEvents(events, logs) + // TODO ek – make this a post-chain event + if err == nil { + for idx, block := range chain { + header := block.Header() + header.Logger(utils.GetLogger()).Info("added block to chain", + "segmentIndex", idx, + "parentHash", header.ParentHash) + if header.ShardStateHash != (common.Hash{}) { + epoch := new(big.Int).Add(header.Epoch, common.Big1) + err = bc.WriteShardState(epoch, header.ShardState) + if err != nil { + ctxerror.Log15(header.Logger(utils.GetLogger()).Warn, + ctxerror.New("cannot store shard state").WithCause(err)) + } + } + } + } return n, err } @@ -1651,36 +1676,33 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript return bc.scope.Track(bc.logsFeed.Subscribe(ch)) } -// GetShardState retrieves sharding state given block hash and block number -func (bc *BlockChain) GetShardState(hash common.Hash, number uint64) types.ShardState { - if cached, ok := bc.shardStateCache.Get(hash); ok { +// ReadShardState retrieves sharding state given the epoch number. +func (bc *BlockChain) ReadShardState(epoch *big.Int) (types.ShardState, error) { + cacheKey := string(epoch.Bytes()) + if cached, ok := bc.shardStateCache.Get(cacheKey); ok { shardState := cached.(types.ShardState) - return shardState + return shardState, nil } - shardState := rawdb.ReadShardState(bc.db, hash, number) - if shardState == nil { - return nil - } - bc.shardStateCache.Add(hash, shardState) - return shardState -} - -// GetShardStateByNumber retrieves sharding state given the block number -func (bc *BlockChain) GetShardStateByNumber(number uint64) types.ShardState { - hash := rawdb.ReadCanonicalHash(bc.db, number) - if hash == (common.Hash{}) { - return nil + shardState, err := rawdb.ReadShardState(bc.db, epoch) + if err != nil { + return nil, err } - return bc.GetShardState(hash, number) + bc.shardStateCache.Add(cacheKey, shardState) + return shardState, nil } -// GetShardStateByHash retrieves the shard state given the blockhash, return nil if not exist -func (bc *BlockChain) GetShardStateByHash(hash common.Hash) types.ShardState { - number := bc.hc.GetBlockNumber(hash) - if number == nil { - return nil +// WriteShardState saves the given sharding state under the given epoch number. +func (bc *BlockChain) WriteShardState( + epoch *big.Int, shardState types.ShardState, +) error { + shardState = shardState.DeepCopy() + err := rawdb.WriteShardState(bc.db, epoch, shardState) + if err != nil { + return err } - return bc.GetShardState(hash, *number) + cacheKey := string(epoch.Bytes()) + bc.shardStateCache.Add(cacheKey, shardState) + return nil } // GetRandSeedByNumber retrieves the rand seed given the block number, return 0 if not exist @@ -1701,46 +1723,60 @@ func (bc *BlockChain) GetRandPreimageByNumber(number uint64) [32]byte { return header.RandPreimage } -// GetNewShardState will calculate (if not exist) and get the new shard state for epoch block or nil if block is not epoch block -// epoch block is where the new shard state stored -func (bc *BlockChain) GetNewShardState(block *types.Block, stakeInfo *map[common.Address]*structs.StakeInfo) types.ShardState { - hash := block.Hash() - // just ignore non-epoch block - if !IsEpochBlock(block) { - return nil +// GetShardState returns the shard state for the given epoch, +// creating one if needed. +func (bc *BlockChain) GetShardState( + epoch *big.Int, stakeInfo *map[common.Address]*structs.StakeInfo, +) (types.ShardState, error) { + shardState, err := bc.ReadShardState(epoch) + if err == nil { // TODO ek – distinguish ErrNotFound + return shardState, err + } + shardState, err = CalculateNewShardState(bc, epoch, stakeInfo) + if err != nil { + return nil, err } - number := block.NumberU64() - shardState := bc.GetShardState(hash, number) - if shardState == nil { - epoch := GetEpochFromBlockNumber(number) - shardState = CalculateNewShardState(bc, epoch, stakeInfo) - bc.shardStateCache.Add(hash, shardState) + err = bc.WriteShardState(epoch, shardState) + if err != nil { + return nil, err } - return shardState + utils.GetLogger().Debug("saved new shard state", "epoch", epoch) + return shardState, nil } -// ValidateNewShardState validate whether the new shard state root matches -func (bc *BlockChain) ValidateNewShardState(block *types.Block, stakeInfo *map[common.Address]*structs.StakeInfo) error { - shardState := bc.GetNewShardState(block, stakeInfo) - if shardState == nil { - return nil +// ChainDb returns the database +func (bc *BlockChain) ChainDb() ethdb.Database { return bc.db } + +// GetEpochBlockNumber returns the first block number of the given epoch. +func (bc *BlockChain) GetEpochBlockNumber(epoch *big.Int) (*big.Int, error) { + // Try cache first + cacheKey := string(epoch.Bytes()) + if cachedValue, ok := bc.epochCache.Get(cacheKey); ok { + return (&big.Int{}).SetBytes([]byte(cachedValue.(string))), nil } - if shardState.Hash() != block.Header().ShardStateHash { - return ErrShardStateNotMatch + blockNum, err := rawdb.ReadEpochBlockNumber(bc.db, epoch) + if err != nil { + return nil, ctxerror.New("cannot read epoch block number from database", + "epoch", epoch, + ).WithCause(err) + } + cachedValue := []byte(blockNum.Bytes()) + bc.epochCache.Add(cacheKey, cachedValue) + return blockNum, nil +} + +// StoreEpochBlockNumber stores the given epoch-first block number. +func (bc *BlockChain) StoreEpochBlockNumber( + epoch *big.Int, blockNum *big.Int, +) error { + cacheKey := string(epoch.Bytes()) + cachedValue := []byte(blockNum.Bytes()) + bc.epochCache.Add(cacheKey, cachedValue) + if err := rawdb.WriteEpochBlockNumber(bc.db, epoch, blockNum); err != nil { + return ctxerror.New("cannot write epoch block number to database", + "epoch", epoch, + "epochBlockNum", blockNum, + ).WithCause(err) } - utils.GetLogInstance().Debug("[resharding] validate new shard state successfully", "shardStateHash", shardState.Hash()) return nil } - -// StoreNewShardState insert new shard state into epoch block -func (bc *BlockChain) StoreNewShardState(block *types.Block, stakeInfo *map[common.Address]*structs.StakeInfo) types.ShardState { - // write state into db. - shardState := bc.GetNewShardState(block, stakeInfo) - if shardState != nil { - hash := block.Hash() - number := block.NumberU64() - rawdb.WriteShardState(bc.db, hash, number, shardState) - utils.GetLogInstance().Debug("[Resharding] Saved new shard state successfully", "epoch", GetEpochFromBlockNumber(block.NumberU64())) - } - return shardState -} diff --git a/core/chain_makers.go b/core/chain_makers.go index 80686ee2c..60bb747d6 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -23,6 +23,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/params" + consensus_engine "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" @@ -189,7 +190,10 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse } if b.engine != nil { // Finalize and seal the block - block, _ := b.engine.Finalize(chainreader, b.header, statedb, b.txs, b.receipts) + block, err := b.engine.Finalize(chainreader, b.header, statedb, b.txs, b.receipts) + if err != nil { + panic(err) + } // Write state changes to db root, err := statedb.Commit(config.IsEIP158(b.header.Number)) @@ -273,3 +277,4 @@ func (cr *fakeChainReader) GetHeaderByNumber(number uint64) *types.Header func (cr *fakeChainReader) GetHeaderByHash(hash common.Hash) *types.Header { return nil } func (cr *fakeChainReader) GetHeader(hash common.Hash, number uint64) *types.Header { return nil } func (cr *fakeChainReader) GetBlock(hash common.Hash, number uint64) *types.Block { return nil } +func (cr *fakeChainReader) ReadShardState(epoch *big.Int) (types.ShardState, error) { return nil, nil } diff --git a/core/genesis.go b/core/genesis.go index 1e0bdb533..636d944e2 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -56,6 +56,7 @@ type Genesis struct { Coinbase common.Address `json:"coinbase"` Alloc GenesisAlloc `json:"alloc" gencodec:"required"` ShardStateHash common.Hash `json:"shardStateHash"` + ShardState types.ShardState `json:"shardState"` // These fields are used for consensus tests. Please don't use them // in actual genesis blocks. @@ -240,6 +241,7 @@ func (g *Genesis) ToBlock(db ethdb.Database) *types.Block { head := &types.Header{ Number: new(big.Int).SetUint64(g.Number), Nonce: types.EncodeNonce(g.Nonce), + Epoch: big.NewInt(0), ShardID: g.ShardID, Time: new(big.Int).SetUint64(g.Timestamp), ParentHash: g.ParentHash, @@ -251,6 +253,7 @@ func (g *Genesis) ToBlock(db ethdb.Database) *types.Block { Coinbase: g.Coinbase, Root: root, ShardStateHash: g.ShardStateHash, + // ShardState is absent in epoch block; genesis shard state is implied } if g.GasLimit == 0 { head.GasLimit = 10000000000 // TODO(RJ): figure out better solution. // params.GenesisGasLimit diff --git a/core/rawdb/accessors_chain.go b/core/rawdb/accessors_chain.go index 42f0ea2dd..e4bc679d9 100644 --- a/core/rawdb/accessors_chain.go +++ b/core/rawdb/accessors_chain.go @@ -24,7 +24,10 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/internal/ctxerror" + "github.com/harmony-one/harmony/internal/utils" ) // ReadCanonicalHash retrieves the hash assigned to a canonical block number. @@ -337,6 +340,30 @@ func ReadBlock(db DatabaseReader, hash common.Hash, number uint64) *types.Block func WriteBlock(db DatabaseWriter, block *types.Block) { WriteBody(db, block.Hash(), block.NumberU64(), block.Body()) WriteHeader(db, block.Header()) + // TODO ek – maybe roll the below into WriteHeader() + epoch := block.Header().Epoch + if epoch == nil { + // backward compatibility + return + } + epoch = new(big.Int).Set(epoch) + epochBlockNum := block.Number() + writeOne := func() { + if err := WriteEpochBlockNumber(db, epoch, epochBlockNum); err != nil { + ctxerror.Log15(utils.GetLogInstance().Error, err) + } + } + // A block may be a genesis block AND end-of-epoch block at the same time. + if epochBlockNum.Sign() == 0 { + // Genesis block; record this block's epoch and block numbers. + writeOne() + } + if len(block.Header().ShardState) > 0 { + // End-of-epoch block; record the next epoch after this block. + epoch = new(big.Int).Add(epoch, common.Big1) + epochBlockNum = new(big.Int).Add(epochBlockNum, common.Big1) + writeOne() + } } // DeleteBlock removes all block data associated with a hash. @@ -375,26 +402,56 @@ func FindCommonAncestor(db DatabaseReader, a, b *types.Header) *types.Header { } // ReadShardState retrieves sharding state. -func ReadShardState(db DatabaseReader, hash common.Hash, number uint64) types.ShardState { - data, _ := db.Get(shardStateKey(number, hash)) - if len(data) == 0 { - return nil +func ReadShardState( + db DatabaseReader, epoch *big.Int, +) (shardState types.ShardState, err error) { + var data []byte + data, err = db.Get(shardStateKey(epoch)) + if err != nil { + return nil, ctxerror.New("cannot read sharding state from rawdb", + "epoch", epoch, + ).WithCause(err) } - shardState := types.ShardState{} - if err := rlp.DecodeBytes(data, &shardState); err != nil { - log.Error("Fail to decode sharding state", "hash", hash, "number", number, "err", err) - return nil + if err = rlp.DecodeBytes(data, &shardState); err != nil { + return nil, ctxerror.New("cannot decode sharding state", + "epoch", epoch, + ).WithCause(err) } - return shardState + return shardState, nil } // WriteShardState stores sharding state into database. -func WriteShardState(db DatabaseWriter, hash common.Hash, number uint64, shardState types.ShardState) { +func WriteShardState( + db DatabaseWriter, epoch *big.Int, shardState types.ShardState, +) (err error) { data, err := rlp.EncodeToBytes(shardState) if err != nil { - log.Crit("Failed to encode sharding state", "err", err) + return ctxerror.New("cannot encode sharding state", + "epoch", epoch, + ).WithCause(err) } - if err := db.Put(shardStateKey(number, hash), data); err != nil { - log.Crit("Failed to store sharding state", "err", err) + if err = db.Put(shardStateKey(epoch), data); err != nil { + return ctxerror.New("cannot write sharding state", + "epoch", epoch, + ).WithCause(err) } + utils.GetLogger().Info("wrote sharding state", + "epoch", epoch, "numShards", len(shardState)) + return nil +} + +// ReadEpochBlockNumber retrieves the epoch block number for the given epoch, +// or nil if the given epoch is not found in the database. +func ReadEpochBlockNumber(db DatabaseReader, epoch *big.Int) (*big.Int, error) { + data, err := db.Get(epochBlockNumberKey(epoch)) + if err != nil { + return nil, err + } + return new(big.Int).SetBytes(data), nil +} + +// WriteEpochBlockNumber stores the given epoch-number-to-epoch-block-number +// in the database. +func WriteEpochBlockNumber(db DatabaseWriter, epoch, blockNum *big.Int) error { + return db.Put(epochBlockNumberKey(epoch), blockNum.Bytes()) } diff --git a/core/rawdb/accessors_chain_test.go b/core/rawdb/accessors_chain_test.go index f938db0e0..a0de5029e 100644 --- a/core/rawdb/accessors_chain_test.go +++ b/core/rawdb/accessors_chain_test.go @@ -19,12 +19,17 @@ package rawdb import ( "bytes" "math/big" + "os" + "reflect" "testing" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/rlp" + "github.com/golang/mock/gomock" + mock "github.com/harmony-one/harmony/core/rawdb/mock" "github.com/harmony-one/harmony/core/types" + "github.com/syndtr/goleveldb/leveldb" "golang.org/x/crypto/sha3" ) @@ -108,6 +113,11 @@ func TestBlockStorage(t *testing.T) { Extra: []byte("test block"), TxHash: types.EmptyRootHash, ReceiptHash: types.EmptyRootHash, + Epoch: big.NewInt(0), + Number: big.NewInt(0), + ShardState: types.ShardState{ + {}, + }, }) if entry := ReadBlock(db, block.Hash(), block.NumberU64()); entry != nil { t.Fatalf("Non existent block returned: %v", entry) @@ -135,6 +145,16 @@ func TestBlockStorage(t *testing.T) { } else if types.DeriveSha(types.Transactions(entry.Transactions)) != types.DeriveSha(block.Transactions()) || types.CalcUncleHash(entry.Uncles) != types.CalcUncleHash(block.Uncles()) { t.Fatalf("Retrieved body mismatch: have %v, want %v", entry, block.Body()) } + if actual, err := ReadEpochBlockNumber(db, big.NewInt(0)); err != nil { + t.Fatalf("Genesis epoch block number not found, error=%#v", err) + } else if expected := big.NewInt(0); actual.Cmp(expected) != 0 { + t.Fatalf("Genesis epoch block number mismatch: have %v, want %v", actual, expected) + } + if actual, err := ReadEpochBlockNumber(db, big.NewInt(1)); err != nil { + t.Fatalf("Next epoch block number not found, error=%#v", err) + } else if expected := big.NewInt(1); actual.Cmp(expected) != 0 { + t.Fatalf("Next epoch block number mismatch: have %v, want %v", actual, expected) + } // Delete the block and verify the execution DeleteBlock(db, block.Hash(), block.NumberU64()) if entry := ReadBlock(db, block.Hash(), block.NumberU64()); entry != nil { @@ -315,3 +335,154 @@ func TestBlockReceiptStorage(t *testing.T) { t.Fatalf("deleted receipts returned: %v", rs) } } + +func TestReadEpochBlockNumber(t *testing.T) { + type args struct { + // db is mocked + epoch *big.Int + } + type dbCall struct { + key []byte + data []byte + error error + } + tests := []struct { + name string + args args + dbCall dbCall + want *big.Int + wantErr bool + }{ + { + "0", + args{epoch: big.NewInt(0)}, + dbCall{ + key: []byte{}, + data: []byte{}, + error: nil, + }, + big.NewInt(0), + false, + }, + { + "1", + args{epoch: big.NewInt(1)}, + dbCall{ + key: []byte{0x01}, + data: []byte{0x64}, + error: nil, + }, + big.NewInt(100), + false, + }, + { + "1000", + args{epoch: big.NewInt(1000)}, + dbCall{ + key: []byte{0x03, 0xe8}, + data: []byte{0x01, 0x86, 0xa0}, + error: nil, + }, + big.NewInt(100000), + false, + }, + { + "error", + args{epoch: big.NewInt(0)}, + dbCall{ + key: []byte{}, + data: []byte{}, + error: leveldb.ErrNotFound, + }, + nil, + true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + db := mock.NewMockDatabaseReader(ctrl) + fullKey := append(epochBlockNumberPrefix, tt.dbCall.key...) + db.EXPECT().Get(fullKey).Return(tt.dbCall.data, tt.dbCall.error) + got, err := ReadEpochBlockNumber(db, tt.args.epoch) + if (err != nil) != tt.wantErr { + t.Errorf("ReadEpochBlockNumber() error = %v, wantErr %v", err, tt.wantErr) + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ReadEpochBlockNumber() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestWriteEpochBlockNumber(t *testing.T) { + type args struct { + epoch *big.Int + blockNum *big.Int + } + type dbCall struct { + key []byte + data []byte + error error + } + tests := []struct { + name string + args args + dbCall dbCall + wantErr bool + }{ + { + "0", + args{epoch: big.NewInt(0), blockNum: big.NewInt(0)}, + dbCall{ + key: []byte{}, + data: []byte{}, + error: nil, + }, + false, + }, + { + "1", + args{epoch: big.NewInt(1), blockNum: big.NewInt(100)}, + dbCall{ + key: []byte{0x01}, + data: []byte{0x64}, + error: nil, + }, + false, + }, + { + "1000", + args{epoch: big.NewInt(1000), blockNum: big.NewInt(100000)}, + dbCall{ + key: []byte{0x03, 0xe8}, + data: []byte{0x01, 0x86, 0xa0}, + error: nil, + }, + false, + }, + { + "error", + args{epoch: big.NewInt(1000), blockNum: big.NewInt(100000)}, + dbCall{ + key: []byte{0x03, 0xe8}, + data: []byte{0x01, 0x86, 0xa0}, + error: os.ErrClosed, + }, + true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + db := mock.NewMockDatabaseWriter(ctrl) + fullKey := append(epochBlockNumberPrefix, tt.dbCall.key...) + db.EXPECT().Put(fullKey, tt.dbCall.data).Return(tt.dbCall.error) + if err := WriteEpochBlockNumber(db, tt.args.epoch, tt.args.blockNum); (err != nil) != tt.wantErr { + t.Errorf("WriteEpochBlockNumber() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/core/rawdb/interfaces.go b/core/rawdb/interfaces.go index 3bdf55124..67161b424 100644 --- a/core/rawdb/interfaces.go +++ b/core/rawdb/interfaces.go @@ -16,6 +16,8 @@ package rawdb +//go:generate mockgen -source interfaces.go -destination mock/mock.go + // DatabaseReader wraps the Has and Get method of a backing data store. type DatabaseReader interface { Has(key []byte) (bool, error) diff --git a/core/rawdb/mock/mock.go b/core/rawdb/mock/mock.go new file mode 100644 index 000000000..edd02462b --- /dev/null +++ b/core/rawdb/mock/mock.go @@ -0,0 +1,137 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: interfaces.go + +// Package mock_rawdb is a generated GoMock package. +package mock_rawdb + +import ( + gomock "github.com/golang/mock/gomock" + reflect "reflect" +) + +// MockDatabaseReader is a mock of DatabaseReader interface +type MockDatabaseReader struct { + ctrl *gomock.Controller + recorder *MockDatabaseReaderMockRecorder +} + +// MockDatabaseReaderMockRecorder is the mock recorder for MockDatabaseReader +type MockDatabaseReaderMockRecorder struct { + mock *MockDatabaseReader +} + +// NewMockDatabaseReader creates a new mock instance +func NewMockDatabaseReader(ctrl *gomock.Controller) *MockDatabaseReader { + mock := &MockDatabaseReader{ctrl: ctrl} + mock.recorder = &MockDatabaseReaderMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockDatabaseReader) EXPECT() *MockDatabaseReaderMockRecorder { + return m.recorder +} + +// Has mocks base method +func (m *MockDatabaseReader) Has(key []byte) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Has", key) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Has indicates an expected call of Has +func (mr *MockDatabaseReaderMockRecorder) Has(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Has", reflect.TypeOf((*MockDatabaseReader)(nil).Has), key) +} + +// Get mocks base method +func (m *MockDatabaseReader) Get(key []byte) ([]byte, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Get", key) + ret0, _ := ret[0].([]byte) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get +func (mr *MockDatabaseReaderMockRecorder) Get(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockDatabaseReader)(nil).Get), key) +} + +// MockDatabaseWriter is a mock of DatabaseWriter interface +type MockDatabaseWriter struct { + ctrl *gomock.Controller + recorder *MockDatabaseWriterMockRecorder +} + +// MockDatabaseWriterMockRecorder is the mock recorder for MockDatabaseWriter +type MockDatabaseWriterMockRecorder struct { + mock *MockDatabaseWriter +} + +// NewMockDatabaseWriter creates a new mock instance +func NewMockDatabaseWriter(ctrl *gomock.Controller) *MockDatabaseWriter { + mock := &MockDatabaseWriter{ctrl: ctrl} + mock.recorder = &MockDatabaseWriterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockDatabaseWriter) EXPECT() *MockDatabaseWriterMockRecorder { + return m.recorder +} + +// Put mocks base method +func (m *MockDatabaseWriter) Put(key, value []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Put", key, value) + ret0, _ := ret[0].(error) + return ret0 +} + +// Put indicates an expected call of Put +func (mr *MockDatabaseWriterMockRecorder) Put(key, value interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockDatabaseWriter)(nil).Put), key, value) +} + +// MockDatabaseDeleter is a mock of DatabaseDeleter interface +type MockDatabaseDeleter struct { + ctrl *gomock.Controller + recorder *MockDatabaseDeleterMockRecorder +} + +// MockDatabaseDeleterMockRecorder is the mock recorder for MockDatabaseDeleter +type MockDatabaseDeleterMockRecorder struct { + mock *MockDatabaseDeleter +} + +// NewMockDatabaseDeleter creates a new mock instance +func NewMockDatabaseDeleter(ctrl *gomock.Controller) *MockDatabaseDeleter { + mock := &MockDatabaseDeleter{ctrl: ctrl} + mock.recorder = &MockDatabaseDeleterMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockDatabaseDeleter) EXPECT() *MockDatabaseDeleterMockRecorder { + return m.recorder +} + +// Delete mocks base method +func (m *MockDatabaseDeleter) Delete(key []byte) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Delete", key) + ret0, _ := ret[0].(error) + return ret0 +} + +// Delete indicates an expected call of Delete +func (mr *MockDatabaseDeleterMockRecorder) Delete(key interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockDatabaseDeleter)(nil).Delete), key) +} diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index c7fa807dd..374be50d1 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -19,6 +19,7 @@ package rawdb import ( "encoding/binary" + "math/big" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/metrics" @@ -58,6 +59,10 @@ var ( preimagePrefix = []byte("secure-key-") // preimagePrefix + hash -> preimage configPrefix = []byte("ethereum-config-") // config prefix for the db + // epochBlockNumberPrefix + epoch (big.Int.Bytes()) + // -> epoch block number (big.Int.Bytes()) + epochBlockNumberPrefix = []byte("harmony-epoch-block-number-") + // Chain index prefixes (use `i` + single byte to avoid mixing data types). BloomBitsIndexPrefix = []byte("iB") // BloomBitsIndexPrefix is the data table of a chain indexer to track its progress @@ -135,6 +140,10 @@ func configKey(hash common.Hash) []byte { return append(configPrefix, hash.Bytes()...) } -func shardStateKey(number uint64, hash common.Hash) []byte { - return append(append(shardStatePrefix, encodeBlockNumber(number)...), hash.Bytes()...) +func shardStateKey(epoch *big.Int) []byte { + return append(shardStatePrefix, epoch.Bytes()...) +} + +func epochBlockNumberKey(epoch *big.Int) []byte { + return append(epochBlockNumberPrefix, epoch.Bytes()...) } diff --git a/core/resharding.go b/core/resharding.go index 1eb688071..bd9f5a87d 100644 --- a/core/resharding.go +++ b/core/resharding.go @@ -2,12 +2,15 @@ package core import ( "encoding/binary" + "math/big" "math/rand" "sort" "github.com/ethereum/go-ethereum/common" "github.com/harmony-one/bls/ffi/go/bls" + "github.com/harmony-one/harmony/contracts/structs" + "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils/contract" @@ -33,7 +36,7 @@ const ( type ShardingState struct { epoch uint64 // current epoch rnd uint64 // random seed for resharding - numShards int + numShards int // TODO ek – equal to len(shardState); remove this shardState types.ShardState } @@ -123,7 +126,6 @@ func (ss *ShardingState) Reshard(newNodeList []types.NodeID, percent float64) { } for i := 0; i < ss.numShards; i++ { ss.shardState[i].NodeList = append([]types.NodeID{leaders[i]}, ss.shardState[i].NodeList...) - ss.shardState[i].Leader = leaders[i] } } @@ -144,32 +146,50 @@ func GetBlockNumberFromEpoch(epoch uint64) uint64 { return number } +// GetLastBlockNumberFromEpoch calculates the last block number for the given +// epoch. TODO ek – this is a temp hack. +func GetLastBlockNumberFromEpoch(epoch uint64) uint64 { + return (epoch+1)*BlocksPerEpoch - 1 +} + // GetEpochFromBlockNumber calculates the epoch number the block belongs to func GetEpochFromBlockNumber(blockNumber uint64) uint64 { return blockNumber / uint64(BlocksPerEpoch) } // GetShardingStateFromBlockChain will retrieve random seed and shard map from beacon chain for given a epoch -func GetShardingStateFromBlockChain(bc *BlockChain, epoch uint64) *ShardingState { - number := GetBlockNumberFromEpoch(epoch) - shardState := bc.GetShardStateByNumber(number) +func GetShardingStateFromBlockChain(bc *BlockChain, epoch *big.Int) (*ShardingState, error) { + shardState, err := bc.ReadShardState(epoch) + if err != nil { + return nil, err + } + shardState = shardState.DeepCopy() - rndSeedBytes := bc.GetRandSeedByNumber(number) + blockNumber := GetBlockNumberFromEpoch(epoch.Uint64()) + rndSeedBytes := bc.GetRandSeedByNumber(blockNumber) rndSeed := binary.BigEndian.Uint64(rndSeedBytes[:]) - return &ShardingState{epoch: epoch, rnd: rndSeed, shardState: shardState, numShards: len(shardState)} + return &ShardingState{epoch: epoch.Uint64(), rnd: rndSeed, shardState: shardState, numShards: len(shardState)}, nil } // CalculateNewShardState get sharding state from previous epoch and calculate sharding state for new epoch -func CalculateNewShardState(bc *BlockChain, epoch uint64, stakeInfo *map[common.Address]*structs.StakeInfo) types.ShardState { - if epoch == GenesisEpoch { - return GetInitShardState() +func CalculateNewShardState( + bc *BlockChain, epoch *big.Int, + stakeInfo *map[common.Address]*structs.StakeInfo, +) (types.ShardState, error) { + if epoch.Cmp(big.NewInt(GenesisEpoch)) == 0 { + return GetInitShardState(), nil + } + prevEpoch := new(big.Int).Sub(epoch, common.Big1) + ss, err := GetShardingStateFromBlockChain(bc, prevEpoch) + if err != nil { + return nil, ctxerror.New("cannot retrieve previous sharding state"). + WithCause(err) } - ss := GetShardingStateFromBlockChain(bc, epoch-1) newNodeList := ss.UpdateShardingState(stakeInfo) utils.GetLogInstance().Info("Cuckoo Rate", "percentage", CuckooRate) ss.Reshard(newNodeList, CuckooRate) - return ss.shardState + return ss.shardState, nil } // UpdateShardingState remove the unstaked nodes and returns the newly staked node Ids. @@ -212,9 +232,6 @@ func GetInitShardState() types.ShardState { copy(pubKey[:], priKey.GetPublicKey().Serialize()[:]) // TODO: directly read address for bls too curNodeID := types.NodeID{contract.GenesisAccounts[index].Address, pubKey} - if j == 0 { - com.Leader = curNodeID - } com.NodeList = append(com.NodeList, curNodeID) } shardState = append(shardState, com) diff --git a/core/resharding_test.go b/core/resharding_test.go index 3a655dc1c..a99f512bd 100644 --- a/core/resharding_test.go +++ b/core/resharding_test.go @@ -123,9 +123,6 @@ func TestUpdateShardState(t *testing.T) { ss.Reshard(newNodeList, 0.2) assert.Equal(t, 6, ss.numShards) - for _, shard := range ss.shardState { - assert.Equal(t, shard.Leader.BlsPublicKey, shard.NodeList[0].BlsPublicKey) - } } func TestAssignNewNodes(t *testing.T) { diff --git a/core/state_processor.go b/core/state_processor.go index f009f53f4..c9ac56e00 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -24,6 +24,7 @@ import ( "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/vm" + "github.com/harmony-one/harmony/internal/ctxerror" ) // StateProcessor is a basic Processor, which takes care of transitioning @@ -75,7 +76,10 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.DB, cfg vm.C allLogs = append(allLogs, receipt.Logs...) } // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) - p.engine.Finalize(p.bc, header, statedb, block.Transactions(), receipts) + _, err := p.engine.Finalize(p.bc, header, statedb, block.Transactions(), receipts) + if err != nil { + return nil, nil, 0, ctxerror.New("cannot finalize block").WithCause(err) + } return receipts, allLogs, *usedGas, nil } diff --git a/core/types/block.go b/core/types/block.go index 0961a016e..f7db428f7 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" "golang.org/x/crypto/sha3" @@ -84,6 +85,7 @@ type Header struct { MixDigest common.Hash `json:"mixHash" gencodec:"required"` Nonce BlockNonce `json:"nonce" gencodec:"required"` // Additional Fields + Epoch *big.Int `json:"epoch" gencodec:"required"` ShardID uint32 `json:"shardID" gencodec:"required"` PrepareSignature [48]byte `json:"prepareSignature" gencodec:"required"` PrepareBitmap []byte `json:"prepareBitmap" gencodec:"required"` // Contains which validator signed @@ -92,6 +94,7 @@ type Header struct { RandPreimage [32]byte `json:"randPreimage"` RandSeed [32]byte `json:"randSeed"` ShardStateHash common.Hash `json:"shardStateRoot"` + ShardState ShardState `json:"shardState"` } // field type overrides for gencodec @@ -117,6 +120,16 @@ func (h *Header) Size() common.StorageSize { return common.StorageSize(unsafe.Sizeof(*h)) + common.StorageSize(len(h.Extra)+(h.Difficulty.BitLen()+h.Number.BitLen()+h.Time.BitLen())/8) } +// Logger returns a sub-logger with block contexts added. +func (h *Header) Logger(logger log.Logger) log.Logger { + return logger.New( + "blockHash", h.Hash(), + "blockShard", h.ShardID, + "blockEpoch", h.Epoch, + "blockNumber", h.Number, + ) +} + func rlpHash(x interface{}) (h common.Hash) { hw := sha3.NewLegacyKeccak256() rlp.Encode(hw, x) @@ -417,9 +430,10 @@ func (b *Block) WithBody(transactions []*Transaction, uncles []*Header) *Block { // Hash returns the keccak256 hash of b's header. // The hash is computed on the first call and cached thereafter. func (b *Block) Hash() common.Hash { - if hash := b.hash.Load(); hash != nil { - return hash.(common.Hash) - } + //if hash := b.hash.Load(); hash != nil { + // return hash.(common.Hash) + //} + //b.Logger(utils.GetLogger()).Debug("finalizing and caching block hash") v := b.header.Hash() b.hash.Store(v) return v @@ -475,7 +489,16 @@ func (b *Block) AddRandPreimage(pRnd [32]byte) { b.header.RandPreimage = pRnd } -// AddShardStateHash add shardStateHash into block header -func (b *Block) AddShardStateHash(shardStateHash common.Hash) { - b.header.ShardStateHash = shardStateHash +// AddShardState add shardState into block header +func (b *Block) AddShardState(shardState ShardState) { + // Make a copy because ShardState.Hash() internally sorts entries. + // Store the sorted copy. + shardState = append(shardState[:0:0], shardState...) + b.header.ShardStateHash = shardState.Hash() + b.header.ShardState = shardState +} + +// Logger returns a sub-logger with block contexts added. +func (b *Block) Logger(logger log.Logger) log.Logger { + return b.header.Logger(logger) } diff --git a/core/types/shard_state.go b/core/types/shard_state.go index 7a98ed1ea..c75eda613 100644 --- a/core/types/shard_state.go +++ b/core/types/shard_state.go @@ -22,6 +22,26 @@ type EpochShardState struct { // ShardState is the collection of all committees type ShardState []Committee +// FindCommitteeByID returns the committee configuration for the given shard, +// or nil if the given shard is not found. +func (ss ShardState) FindCommitteeByID(shardID uint32) *Committee { + for _, committee := range ss { + if committee.ShardID == shardID { + return &committee + } + } + return nil +} + +// DeepCopy returns a deep copy of the receiver. +func (ss ShardState) DeepCopy() ShardState { + var r ShardState + for _, c := range ss { + r = append(r, c.DeepCopy()) + } + return r +} + // CompareShardState compares two ShardState instances. func CompareShardState(s1, s2 ShardState) int { commonLen := len(s1) @@ -92,6 +112,11 @@ func CompareNodeID(id1, id2 *NodeID) int { // NodeIDList is a list of NodeIDList. type NodeIDList []NodeID +// DeepCopy returns a deep copy of the receiver. +func (l NodeIDList) DeepCopy() NodeIDList { + return append(l[:0:0], l...) +} + // CompareNodeIDList compares two node ID lists. func CompareNodeIDList(l1, l2 NodeIDList) int { commonLen := len(l1) @@ -115,10 +140,16 @@ func CompareNodeIDList(l1, l2 NodeIDList) int { // Committee contains the active nodes in one shard type Committee struct { ShardID uint32 - Leader NodeID NodeList NodeIDList } +// DeepCopy returns a deep copy of the receiver. +func (c Committee) DeepCopy() Committee { + r := c + r.NodeList = r.NodeList.DeepCopy() + return r +} + // CompareCommittee compares two committees and their leader/node list. func CompareCommittee(c1, c2 *Committee) int { switch { @@ -127,9 +158,6 @@ func CompareCommittee(c1, c2 *Committee) int { case c1.ShardID > c2.ShardID: return +1 } - if c := CompareNodeID(&c1.Leader, &c2.Leader); c != 0 { - return c - } if c := CompareNodeIDList(c1.NodeList, c2.NodeList); c != 0 { return c } @@ -156,6 +184,8 @@ func GetHashFromNodeList(nodeList []NodeID) []byte { // Hash is the root hash of ShardState func (ss ShardState) Hash() (h common.Hash) { + // TODO ek – this sorting really doesn't belong here; it should instead + // be made an explicit invariant to be maintained and, if needed, checked. sort.Slice(ss, func(i, j int) bool { return ss[i].ShardID < ss[j].ShardID }) diff --git a/core/types/shard_state_test.go b/core/types/shard_state_test.go index 871dbabf2..0b13e2260 100644 --- a/core/types/shard_state_test.go +++ b/core/types/shard_state_test.go @@ -49,7 +49,6 @@ func TestGetHashFromNodeList(t *testing.T) { func TestHash(t *testing.T) { com1 := Committee{ ShardID: 22, - Leader: NodeID{"node11", blsPubKey11}, NodeList: []NodeID{ {"node11", blsPubKey11}, {"node22", blsPubKey22}, @@ -58,7 +57,6 @@ func TestHash(t *testing.T) { } com2 := Committee{ ShardID: 2, - Leader: NodeID{"node4", blsPubKey4}, NodeList: []NodeID{ {"node4", blsPubKey4}, {"node5", blsPubKey5}, @@ -70,7 +68,6 @@ func TestHash(t *testing.T) { com3 := Committee{ ShardID: 2, - Leader: NodeID{"node4", blsPubKey4}, NodeList: []NodeID{ {"node6", blsPubKey6}, {"node5", blsPubKey5}, @@ -79,7 +76,6 @@ func TestHash(t *testing.T) { } com4 := Committee{ ShardID: 22, - Leader: NodeID{"node11", blsPubKey11}, NodeList: []NodeID{ {"node1", blsPubKey1}, {"node11", blsPubKey11}, diff --git a/crypto/bls/bls.go b/crypto/bls/bls.go index a387dcd1c..7e6f90497 100644 --- a/crypto/bls/bls.go +++ b/crypto/bls/bls.go @@ -2,9 +2,10 @@ package bls import ( "errors" - "fmt" "github.com/harmony-one/bls/ffi/go/bls" + + "github.com/harmony-one/harmony/internal/ctxerror" ) func init() { @@ -84,7 +85,9 @@ func (m *Mask) Len() int { // cosigners 0-7, bits 0-7 of byte 1 correspond to cosigners 8-15, etc. func (m *Mask) SetMask(mask []byte) error { if m.Len() != len(mask) { - return fmt.Errorf("mismatching Bitmap lengths") + return ctxerror.New("mismatching bitmap lengths", + "expectedBitmapLength", m.Len(), + "providedBitmapLength", len(mask)) } for i := range m.publics { byt := i >> 3 diff --git a/go.mod b/go.mod index 171f929b0..566d63086 100644 --- a/go.mod +++ b/go.mod @@ -41,15 +41,16 @@ require ( github.com/multiformats/go-multiaddr v0.0.2 github.com/multiformats/go-multiaddr-net v0.0.1 github.com/pborman/uuid v1.2.0 + github.com/pkg/errors v0.8.1 github.com/rjeczalik/notify v0.9.2 github.com/rs/cors v1.6.0 // indirect github.com/shirou/gopsutil v2.18.12+incompatible github.com/smartystreets/goconvey v0.0.0-20190330032615-68dc04aab96a // indirect github.com/stretchr/testify v1.3.0 + github.com/syndtr/goleveldb v1.0.0 github.com/whyrusleeping/go-logging v0.0.0-20170515211332-0457bb6b88fc golang.org/x/crypto v0.0.0-20190313024323-a1f597ede03a golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 - golang.org/x/net v0.0.0-20190313220215-9f648a60d977 golang.org/x/tools v0.0.0-20190328211700-ab21143f2384 google.golang.org/grpc v1.19.0 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 diff --git a/internal/configs/node/config.go b/internal/configs/node/config.go index ec7da09e1..3a06196d6 100644 --- a/internal/configs/node/config.go +++ b/internal/configs/node/config.go @@ -9,10 +9,10 @@ import ( "fmt" "sync" - "github.com/ethereum/go-ethereum/ethdb" "github.com/harmony-one/bls/ffi/go/bls" - "github.com/harmony-one/harmony/p2p" p2p_crypto "github.com/libp2p/go-libp2p-crypto" + + "github.com/harmony-one/harmony/p2p" ) // Role defines a role of a node. @@ -76,8 +76,9 @@ type ConfigType struct { P2pPriKey p2p_crypto.PrivKey ConsensusPriKey *bls.SecretKey ConsensusPubKey *bls.PublicKey - MainDB *ethdb.LDBDatabase - BeaconDB *ethdb.LDBDatabase + + // Database directory + DBDir string SelfPeer p2p.Peer Leader p2p.Peer diff --git a/internal/shardchain/dbfactory.go b/internal/shardchain/dbfactory.go new file mode 100644 index 000000000..8c95a7cfa --- /dev/null +++ b/internal/shardchain/dbfactory.go @@ -0,0 +1,34 @@ +package shardchain + +import ( + "fmt" + "path" + + "github.com/ethereum/go-ethereum/ethdb" +) + +// DBFactory is a blockchain database factory. +type DBFactory interface { + // NewChainDB returns a new database for the blockchain for + // given shard. + NewChainDB(shardID uint32) (ethdb.Database, error) +} + +// LDBFactory is a LDB-backed blockchain database factory. +type LDBFactory struct { + RootDir string // directory in which to put shard databases in. +} + +// NewChainDB returns a new LDB for the blockchain for given shard. +func (f *LDBFactory) NewChainDB(shardID uint32) (ethdb.Database, error) { + dir := path.Join(f.RootDir, fmt.Sprintf("harmony_db_%d", shardID)) + return ethdb.NewLDBDatabase(dir, 0, 0) +} + +// MemDBFactory is a memory-backed blockchain database factory. +type MemDBFactory struct{} + +// NewChainDB returns a new memDB for the blockchain for given shard. +func (f *MemDBFactory) NewChainDB(shardID uint32) (ethdb.Database, error) { + return ethdb.NewMemDatabase(), nil +} diff --git a/internal/shardchain/dbinit.go b/internal/shardchain/dbinit.go new file mode 100644 index 000000000..4987a7886 --- /dev/null +++ b/internal/shardchain/dbinit.go @@ -0,0 +1,8 @@ +package shardchain + +import "github.com/ethereum/go-ethereum/ethdb" + +// DBInitializer initializes a newly created chain database. +type DBInitializer interface { + InitChainDB(db ethdb.Database, shardID uint32) error +} diff --git a/internal/shardchain/shardchains.go b/internal/shardchain/shardchains.go new file mode 100644 index 000000000..e1705190a --- /dev/null +++ b/internal/shardchain/shardchains.go @@ -0,0 +1,136 @@ +package shardchain + +import ( + "math/big" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/params" + + "github.com/harmony-one/harmony/consensus/engine" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/core/rawdb" + "github.com/harmony-one/harmony/core/vm" + "github.com/harmony-one/harmony/internal/ctxerror" + "github.com/harmony-one/harmony/internal/utils" +) + +// Collection is a collection of per-shard blockchains. +type Collection interface { + // ShardChain returns the blockchain for the given shard, + // opening one as necessary. + ShardChain(shardID uint32) (*core.BlockChain, error) + + // CloseShardChain closes the given shard chain. + CloseShardChain(shardID uint32) error + + // Close closes all shard chains. + Close() error +} + +// CollectionImpl is the main implementation of the shard chain collection. +// See the Collection interface for details. +type CollectionImpl struct { + dbFactory DBFactory + dbInit DBInitializer + engine engine.Engine + mtx sync.Mutex + pool map[uint32]*core.BlockChain +} + +// NewCollection creates and returns a new shard chain collection. +// +// dbFactory is the shard chain database factory to use. +// +// dbInit is the shard chain initializer to use when the database returned by +// the factory is brand new (empty). +func NewCollection( + dbFactory DBFactory, dbInit DBInitializer, engine engine.Engine, +) *CollectionImpl { + return &CollectionImpl{ + dbFactory: dbFactory, + dbInit: dbInit, + engine: engine, + pool: make(map[uint32]*core.BlockChain), + } +} + +// ShardChain returns the blockchain for the given shard, +// opening one as necessary. +func (sc *CollectionImpl) ShardChain(shardID uint32) (*core.BlockChain, error) { + sc.mtx.Lock() + defer sc.mtx.Unlock() + if bc, ok := sc.pool[shardID]; ok { + return bc, nil + } + var db ethdb.Database + defer func() { + if db != nil { + db.Close() + } + }() + var err error + if db, err = sc.dbFactory.NewChainDB(shardID); err != nil { + // NewChainDB may return incompletely initialized DB; + // avoid closing it. + db = nil + return nil, ctxerror.New("cannot open chain database").WithCause(err) + } + if rawdb.ReadCanonicalHash(db, 0) == (common.Hash{}) { + utils.GetLogger().Info("initializing a new chain database", + "shardID", shardID) + if err := sc.dbInit.InitChainDB(db, shardID); err != nil { + return nil, ctxerror.New("cannot initialize a new chain database"). + WithCause(err) + } + } + var cacheConfig *core.CacheConfig + // TODO ek – archival + if false { + cacheConfig = &core.CacheConfig{Disabled: true} + } + chainConfig := *params.TestChainConfig + chainConfig.ChainID = big.NewInt(int64(shardID)) + bc, err := core.NewBlockChain( + db, cacheConfig, &chainConfig, sc.engine, vm.Config{}, nil, + ) + if err != nil { + return nil, ctxerror.New("cannot create blockchain").WithCause(err) + } + db = nil // don't close + sc.pool[shardID] = bc + return bc, nil +} + +// CloseShardChain closes the given shard chain. +func (sc *CollectionImpl) CloseShardChain(shardID uint32) error { + sc.mtx.Lock() + defer sc.mtx.Unlock() + bc, ok := sc.pool[shardID] + if !ok { + return ctxerror.New("shard chain not found", "shardID", shardID) + } + utils.GetLogger().Info("closing shard chain", "shardID", shardID) + delete(sc.pool, shardID) + bc.Stop() + bc.ChainDb().Close() + utils.GetLogger().Info("closed shard chain", "shardID", shardID) + return nil +} + +// Close closes all shard chains. +func (sc *CollectionImpl) Close() error { + newPool := make(map[uint32]*core.BlockChain) + sc.mtx.Lock() + oldPool := sc.pool + sc.pool = newPool + sc.mtx.Unlock() + for shardID, bc := range oldPool { + utils.GetLogger().Info("closing shard chain", "shardID", shardID) + bc.Stop() + bc.ChainDb().Close() + utils.GetLogger().Info("closed shard chain", "shardID", shardID) + } + return nil +} diff --git a/node/contract.go b/node/contract.go index 0a4a4c807..c056ed374 100644 --- a/node/contract.go +++ b/node/contract.go @@ -85,7 +85,7 @@ func (node *Node) QueryStakeInfo() *structs.StakeInfoReturnValue { priKey := contract_constants.GenesisBeaconAccountPriKey deployerAddress := crypto.PubkeyToAddress(priKey.PublicKey) - state, err := node.blockchain.State() + state, err := node.Blockchain().State() if err != nil { utils.GetLogInstance().Error("Failed to get blockchain state", "error", err) @@ -131,7 +131,7 @@ func (node *Node) getDeployedStakingContract() common.Address { // GetNonceOfAddress returns nonce of an address. func (node *Node) GetNonceOfAddress(address common.Address) uint64 { - state, err := node.blockchain.State() + state, err := node.Blockchain().State() if err != nil { log.Error("Failed to get chain state", "Error", err) return 0 @@ -141,7 +141,7 @@ func (node *Node) GetNonceOfAddress(address common.Address) uint64 { // GetBalanceOfAddress returns balance of an address. func (node *Node) GetBalanceOfAddress(address common.Address) (*big.Int, error) { - state, err := node.blockchain.State() + state, err := node.Blockchain().State() if err != nil { log.Error("Failed to get chain state", "Error", err) return nil, err diff --git a/node/contract_test.go b/node/contract_test.go index da6e49ec7..92c624bf1 100644 --- a/node/contract_test.go +++ b/node/contract_test.go @@ -23,7 +23,7 @@ func prepareNode(t *testing.T) *Node { if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) } - return New(host, consensus, nil, false) + return New(host, consensus, testDBFactory, false) } diff --git a/node/node.go b/node/node.go index 7861a1eb7..b472b9874 100644 --- a/node/node.go +++ b/node/node.go @@ -2,17 +2,17 @@ package node import ( "crypto/ecdsa" + "encoding/hex" "fmt" "math/big" - "os" "sync" "time" - "github.com/harmony-one/bls/ffi/go/bls" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" + "github.com/harmony-one/bls/ffi/go/bls" + "github.com/harmony-one/harmony/accounts" "github.com/harmony-one/harmony/api/client" clientService "github.com/harmony-one/harmony/api/client/service" @@ -25,10 +25,11 @@ import ( "github.com/harmony-one/harmony/contracts/structs" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/core/vm" "github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/drand" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/internal/ctxerror" + "github.com/harmony-one/harmony/internal/shardchain" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" @@ -94,9 +95,8 @@ type Node struct { pendingTxMutex sync.Mutex DRand *drand.DRand // The instance for distributed randomness protocol - blockchain *core.BlockChain // The blockchain for the shard where this node belongs - beaconChain *core.BlockChain // The blockchain for beacon chain. - db *ethdb.LDBDatabase // LevelDB to store blockchain. + // Shard databases + shardChains shardchain.Collection ClientPeer *p2p.Peer // The peer for the harmony tx generator client, used for leaders to return proof-of-accept Client *client.Client // The presence of a client object means this node will also act as a client @@ -148,8 +148,9 @@ type Node struct { // TODO: leochen, can we use multiple account for staking? StakingAccount accounts.Account - // For test only - TestBankKeys []*ecdsa.PrivateKey + // For test only; TODO ek – remove this + TestBankKeys []*ecdsa.PrivateKey + ContractDeployerKey *ecdsa.PrivateKey ContractDeployerCurrentNonce uint64 // The nonce of the deployer contract at current block ContractAddresses []common.Address @@ -183,11 +184,41 @@ type Node struct { ContractCaller *contracts.ContractCaller accountManager *accounts.Manager + + // Next shard state + nextShardState struct { + // The received master shard state + master *types.EpochShardState + + // When for a leader to propose the next shard state, + // or for a validator to wait for a proposal before view change. + // TODO ek – replace with retry-based logic instead of delay + proposeTime time.Time + } + + isFirstTime bool // the node was started with a fresh database } -// Blockchain returns the blockchain from node +// Blockchain returns the blockchain for the node's current shard. func (node *Node) Blockchain() *core.BlockChain { - return node.blockchain + shardID := node.Consensus.ShardID + bc, err := node.shardChains.ShardChain(shardID) + if err != nil { + err = ctxerror.New("cannot get shard chain", "shardID", shardID). + WithCause(err) + panic(err) //ctxerror.Log15(utils.GetLogger().Crit, err) + } + return bc +} + +// Beaconchain returns the beaconchain from node. +func (node *Node) Beaconchain() *core.BlockChain { + bc, err := node.shardChains.ShardChain(0) + if err != nil { + err = ctxerror.New("cannot get beaconchain").WithCause(err) + ctxerror.Log15(utils.GetLogger().Crit, err) + } + return bc } // Add new transactions to the pending transaction list @@ -226,7 +257,7 @@ func (node *Node) StartServer() { // Currently used for stats reporting purpose func (node *Node) countNumTransactionsInBlockchain() int { count := 0 - for block := node.blockchain.CurrentBlock(); block != nil; block = node.blockchain.GetBlockByHash(block.Header().ParentHash) { + for block := node.Blockchain().CurrentBlock(); block != nil; block = node.Blockchain().GetBlockByHash(block.Header().ParentHash) { count += len(block.Transactions()) } return count @@ -238,10 +269,8 @@ func (node *Node) GetSyncID() [SyncIDLength]byte { } // New creates a new node. -func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database, isArchival bool) *Node { - var chain *core.BlockChain +func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardchain.DBFactory, isArchival bool) *Node { var err error - var isFirstTime bool // if cannot get blockchain from database, then isFirstTime = true node := Node{} copy(node.syncID[:], GenerateRandomString(SyncIDLength)) @@ -250,39 +279,23 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database, is node.SelfPeer = host.GetSelfPeer() } + // Create test keys. Genesis will later need this. + node.TestBankKeys, err = CreateTestBankKeys(FakeAddressNumber) + if err != nil { + utils.GetLogInstance().Crit("Error while creating test keys", + "error", err) + } + + node.shardChains = shardchain.NewCollection( + chainDBFactory, &genesisInitializer{&node}, consensusObj) + if host != nil && consensusObj != nil { // Consensus and associated channel to communicate blocks node.Consensus = consensusObj - // TODO(ricl): placeholder. Set the account manager to node.accountManager - // // Ensure that the AccountManager method works before the node has started. - // // We rely on this in cmd/geth. - // am, ephemeralKeystore, err := makeAccountManager(conf) - // if err != nil { - // return nil, err - // } - // node.accountManager = am - - // Init db - database := db - if database == nil { - database = ethdb.NewMemDatabase() - chain, err = node.GenesisBlockSetup(database, consensusObj.ShardID, false) - isFirstTime = true - } else { - chain, err = node.InitBlockChainFromDB(db, node.Consensus, isArchival) - isFirstTime = false - if err != nil || chain == nil || chain.CurrentBlock().NumberU64() <= 0 { - chain, err = node.GenesisBlockSetup(database, consensusObj.ShardID, isArchival) - isFirstTime = true - } - } - if err != nil { - utils.GetLogInstance().Error("Error when setup blockchain", "err", err) - os.Exit(1) - } - - node.blockchain = chain + // Load the chains. + chain := node.Blockchain() // this also sets node.isFirstTime if the DB is fresh + _ = node.Beaconchain() node.BlockChannel = make(chan *types.Block) node.ConfirmedBlockChannel = make(chan *types.Block) @@ -296,7 +309,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database, is // Add Faucet contract to all shards, so that on testnet, we can demo wallet in explorer // TODO (leo): we need to have support of cross-shard tx later so that the token can be transferred from beacon chain shard to other tx shards. - if isFirstTime { + if node.isFirstTime { // Setup one time smart contracts node.AddFaucetContractToPendingTransactions() } else { @@ -305,7 +318,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database, is if node.Consensus.ShardID == 0 { // Contracts only exist in beacon chain - if isFirstTime { + if node.isFirstTime { // Setup one time smart contracts node.CurrentStakes = make(map[common.Address]*structs.StakeInfo) node.AddStakingContractToPendingTransactions() //This will save the latest information about staked nodes in current staked @@ -313,7 +326,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database, is node.AddContractKeyAndAddress(scStaking) } } - if isFirstTime { + if node.isFirstTime { // TODO(minhdoan): Think of a better approach to deploy smart contract. // This is temporary for demo purpose. node.AddLotteryContract() @@ -324,7 +337,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database, is } } - node.ContractCaller = contracts.NewContractCaller(&db, node.blockchain, params.TestChainConfig) + node.ContractCaller = contracts.NewContractCaller(node.Blockchain(), params.TestChainConfig) if consensusObj != nil && nodeconfig.GetDefaultConfig().IsLeader() { node.State = NodeLeader @@ -359,40 +372,47 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database, is } // InitShardState initialize genesis shard state and update committee pub keys for consensus and drand -func (node *Node) InitShardState(isGenesis bool) { - shardState := types.ShardState{} - if isGenesis { - // Store the genesis shard state into db. - if node.Consensus != nil { - if node.Consensus.ShardID == 0 { - shardState = node.blockchain.StoreNewShardState(node.blockchain.CurrentBlock(), nil) - } else { - shardState = node.beaconChain.StoreNewShardState(node.beaconChain.CurrentBlock(), nil) - } - } - } else { - epochShardState, err := node.retrieveEpochShardState() - if err != nil { - utils.GetLogInstance().Error("[Shard State] Failed to decode epoch shard state", "error", err) - } - utils.GetLogInstance().Info("Successfully loaded epoch shard state") - shardState = epochShardState.ShardState +func (node *Node) InitShardState(isGenesis bool) (err error) { + logger := utils.GetLogInstance().New("isGenesis", isGenesis) + getLogger := func() log.Logger { return utils.WithCallerSkip(logger, 1) } + if node.Consensus == nil { + getLogger().Crit("consensus is nil; cannot figure out shard ID") + } + shardID := node.Consensus.ShardID + logger = logger.New("shardID", shardID) + getLogger().Info("initializing shard state") + + // Get genesis epoch shard state from chain + genesisEpoch := big.NewInt(core.GenesisEpoch) + shardState, err := node.Beaconchain().GetShardState(genesisEpoch, nil) + if err != nil { + return ctxerror.New("cannot read genesis shard state").WithCause(err) } + getLogger().Info("Successfully loaded epoch shard state") // Update validator public keys - for _, shard := range shardState { - if shard.ShardID == node.Consensus.ShardID { - pubKeys := []*bls.PublicKey{} - for _, node := range shard.NodeList { - blsPubKey := &bls.PublicKey{} - blsPubKey.Deserialize(node.BlsPublicKey[:]) - pubKeys = append(pubKeys, blsPubKey) - } - node.Consensus.UpdatePublicKeys(pubKeys) - node.DRand.UpdatePublicKeys(pubKeys) - break + committee := shardState.FindCommitteeByID(shardID) + if committee == nil { + return ctxerror.New("our shard is not found in genesis shard state", + "shardID", shardID) + } + pubKeys := []*bls.PublicKey{} + for _, node := range committee.NodeList { + pubKey := &bls.PublicKey{} + pubKeyBytes := node.BlsPublicKey[:] + err = pubKey.Deserialize(pubKeyBytes) + if err != nil { + return ctxerror.New("cannot deserialize BLS public key", + "shardID", shardID, + "pubKeyBytes", hex.EncodeToString(pubKeyBytes), + ).WithCause(err) } + pubKeys = append(pubKeys, pubKey) } + getLogger().Info("initialized shard state", "numPubKeys", len(pubKeys)) + node.Consensus.UpdatePublicKeys(pubKeys) + node.DRand.UpdatePublicKeys(pubKeys) + return nil } // AddPeers adds neighbors nodes @@ -466,33 +486,3 @@ func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) { return nodeConfig, chanPeer } - -// AddBeaconChainDatabase adds database support for beaconchain blocks on normal sharding nodes (not BeaconChain node) -func (node *Node) AddBeaconChainDatabase(db ethdb.Database) { - database := db - if database == nil { - database = ethdb.NewMemDatabase() - } - // TODO (chao) currently we use the same genesis block as normal shard - chain, err := node.GenesisBlockSetup(database, 0, true) - if err != nil { - utils.GetLogInstance().Error("Error when doing genesis setup") - os.Exit(1) - } - node.beaconChain = chain - node.BeaconWorker = worker.New(params.TestChainConfig, chain, &consensus.Consensus{}, pki.GetAddressFromPublicKey(node.SelfPeer.ConsensusPubKey), node.Consensus.ShardID) -} - -// InitBlockChainFromDB retrieves the latest blockchain and state available from the local database -func (node *Node) InitBlockChainFromDB(db ethdb.Database, consensus *consensus.Consensus, isArchival bool) (*core.BlockChain, error) { - chainConfig := params.TestChainConfig - if consensus != nil { - chainConfig.ChainID = big.NewInt(int64(consensus.ShardID)) // Use ChainID as piggybacked ShardID - } - cacheConfig := core.CacheConfig{} - if isArchival { - cacheConfig = core.CacheConfig{Disabled: true, TrieNodeLimit: 256 * 1024 * 1024, TrieTimeLimit: 30 * time.Second} - } - chain, err := core.NewBlockChain(db, &cacheConfig, chainConfig, consensus, vm.Config{}, nil) - return chain, err -} diff --git a/node/node_genesis.go b/node/node_genesis.go index fb0a97f9f..f87c597a3 100644 --- a/node/node_genesis.go +++ b/node/node_genesis.go @@ -5,14 +5,18 @@ import ( "math/big" "math/rand" "strings" - "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/params" + "github.com/pkg/errors" + "github.com/harmony-one/harmony/core" - "github.com/harmony-one/harmony/core/vm" + "github.com/harmony-one/harmony/core/rawdb" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/internal/ctxerror" + "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils/contract" ) @@ -25,8 +29,39 @@ const ( InitFreeFundInEther = 100 ) -// GenesisBlockSetup setups a genesis blockchain. -func (node *Node) GenesisBlockSetup(db ethdb.Database, shardID uint32, isArchival bool) (*core.BlockChain, error) { +// genesisInitializer is a shardchain.DBInitializer adapter. +type genesisInitializer struct { + node *Node +} + +// InitChainDB sets up a new genesis block in the database for the given shard. +func (gi *genesisInitializer) InitChainDB(db ethdb.Database, shardID uint32) error { + shardState := core.GetInitShardState() + if shardID != 0 { + // store only the local shard + c := shardState.FindCommitteeByID(shardID) + if c == nil { + return errors.New("cannot find local shard in genesis") + } + shardState = types.ShardState{*c} + } + if err := rawdb.WriteShardState(db, common.Big0, shardState); err != nil { + return ctxerror.New("cannot store epoch shard state").WithCause(err) + } + if err := gi.node.SetupGenesisBlock(db, shardID); err != nil { + return ctxerror.New("cannot setup genesis block").WithCause(err) + } + return nil +} + +// SetupGenesisBlock sets up a genesis blockchain. +func (node *Node) SetupGenesisBlock(db ethdb.Database, shardID uint32) error { + utils.GetLogger().Info("setting up a brand new chain database", + "shardID", shardID) + if shardID == node.Consensus.ShardID { + node.isFirstTime = true + } + // Initialize genesis block and blockchain // Tests account for txgen to use @@ -54,40 +89,45 @@ func (node *Node) GenesisBlockSetup(db ethdb.Database, shardID uint32, isArchiva chainConfig := *params.TestChainConfig chainConfig.ChainID = big.NewInt(int64(shardID)) // Use ChainID as piggybacked ShardID gspec := core.Genesis{ - Config: &chainConfig, - Alloc: genesisAlloc, - ShardID: shardID, - ShardStateHash: core.GetInitShardState().Hash(), + Config: &chainConfig, + Alloc: genesisAlloc, + ShardID: shardID, } // Store genesis block into db. - gspec.MustCommit(db) - cacheConfig := core.CacheConfig{} - if isArchival { - cacheConfig = core.CacheConfig{Disabled: true, TrieNodeLimit: 256 * 1024 * 1024, TrieTimeLimit: 30 * time.Second} + _, err := gspec.Commit(db) + + return err +} + +// CreateTestBankKeys deterministically generates testing addresses. +func CreateTestBankKeys(numAddresses int) (keys []*ecdsa.PrivateKey, err error) { + rand.Seed(0) + bytes := make([]byte, 1000000) + for i := range bytes { + bytes[i] = byte(rand.Intn(100)) + } + reader := strings.NewReader(string(bytes)) + for i := 0; i < numAddresses; i++ { + key, err := ecdsa.GenerateKey(crypto.S256(), reader) + if err != nil { + return nil, err + } + keys = append(keys, key) } - return core.NewBlockChain(db, &cacheConfig, gspec.Config, node.Consensus, vm.Config{}, nil) + return keys, nil } // CreateGenesisAllocWithTestingAddresses create the genesis block allocation that contains deterministically // generated testing addresses with tokens. This is mostly used for generated simulated transactions in txgen. // TODO: Remove it later when moving to production. func (node *Node) CreateGenesisAllocWithTestingAddresses(numAddress int) core.GenesisAlloc { - rand.Seed(0) - len := 1000000 - bytes := make([]byte, len) - for i := 0; i < len; i++ { - bytes[i] = byte(rand.Intn(100)) - } - reader := strings.NewReader(string(bytes)) genesisAloc := make(core.GenesisAlloc) - for i := 0; i < numAddress; i++ { - testBankKey, _ := ecdsa.GenerateKey(crypto.S256(), reader) + for _, testBankKey := range node.TestBankKeys { testBankAddress := crypto.PubkeyToAddress(testBankKey.PublicKey) testBankFunds := big.NewInt(InitFreeFundInEther) testBankFunds = testBankFunds.Mul(testBankFunds, big.NewInt(params.Ether)) genesisAloc[testBankAddress] = core.GenesisAccount{Balance: testBankFunds} - node.TestBankKeys = append(node.TestBankKeys, testBankKey) } return genesisAloc } diff --git a/node/node_handler.go b/node/node_handler.go index 1727c2914..54e7358a8 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -3,11 +3,10 @@ package node import ( "bytes" "context" - "encoding/gob" "encoding/hex" - "fmt" - "io/ioutil" + "errors" "math" + "math/big" "os" "os/exec" "strconv" @@ -15,10 +14,9 @@ import ( "syscall" "time" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" - - "github.com/harmony-one/harmony/core" - + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" pb "github.com/golang/protobuf/proto" "github.com/harmony-one/bls/ffi/go/bls" @@ -28,9 +26,12 @@ import ( "github.com/harmony-one/harmony/api/proto/message" proto_node "github.com/harmony-one/harmony/api/proto/node" "github.com/harmony-one/harmony/api/service" + "github.com/harmony-one/harmony/contracts/structs" + "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/crypto/pki" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" @@ -177,7 +178,9 @@ func (node *Node) messageHandler(content []byte, sender string) { case proto_node.PONG: node.pongMessageHandler(msgPayload) case proto_node.ShardState: - node.epochShardStateMessageHandler(msgPayload) + if err := node.epochShardStateMessageHandler(msgPayload); err != nil { + ctxerror.Log15(utils.GetLogger().Warn, err) + } } default: utils.GetLogInstance().Error("Unknown", "MsgCategory", msgCategory) @@ -248,21 +251,127 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) { } // VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on -func (node *Node) VerifyNewBlock(newBlock *types.Block) bool { - err := node.blockchain.ValidateNewBlock(newBlock, pki.GetAddressFromPublicKey(node.SelfPeer.ConsensusPubKey)) +func (node *Node) VerifyNewBlock(newBlock *types.Block) error { + // TODO ek – where do we verify parent-child invariants, + // e.g. "child.Number == child.IsGenesis() ? 0 : parent.Number+1"? + err := node.Blockchain().ValidateNewBlock(newBlock, pki.GetAddressFromPublicKey(node.SelfPeer.ConsensusPubKey)) if err != nil { - utils.GetLogInstance().Debug("Failed verifying new block", "Error", err, "tx", newBlock.Transactions()[0]) - return false + return ctxerror.New("cannot ValidateNewBlock", + "blockHash", newBlock.Hash(), + "numTx", len(newBlock.Transactions()), + ).WithCause(err) } // TODO: verify the vrf randomness _ = newBlock.Header().RandPreimage - err = node.blockchain.ValidateNewShardState(newBlock, &node.CurrentStakes) + err = node.validateNewShardState(newBlock, &node.CurrentStakes) if err != nil { - utils.GetLogInstance().Debug("Failed to verify new sharding state", "err", err) + return ctxerror.New("failed to verify sharding state").WithCause(err) } - return true + return nil +} + +// BigMaxUint64 is maximum possible uint64 value, that is, (1**64)-1. +var BigMaxUint64 = new(big.Int).SetBytes([]byte{ + 255, 255, 255, 255, 255, 255, 255, 255, +}) + +// validateNewShardState validate whether the new shard state root matches +func (node *Node) validateNewShardState(block *types.Block, stakeInfo *map[common.Address]*structs.StakeInfo) error { + // Common case first – blocks without resharding proposal + header := block.Header() + if header.ShardStateHash == (common.Hash{}) { + // No new shard state was proposed + if block.ShardID() == 0 { + if core.IsEpochLastBlock(block) { + // TODO ek - invoke view change + return errors.New("beacon leader did not propose resharding") + } + } else { + if node.nextShardState.master != nil && + !time.Now().Before(node.nextShardState.proposeTime) { + // TODO ek – invoke view change + return errors.New("regular leader did not propose resharding") + } + } + // We aren't expecting to reshard, so proceed to sign + return nil + } + proposed := header.ShardState + if block.ShardID() == 0 { + // Beacon validators independently recalculate the master state and + // compare it against the proposed copy. + nextEpoch := new(big.Int).Add(block.Header().Epoch, common.Big1) + // TODO ek – this may be called from regular shards, + // for vetting beacon chain blocks received during block syncing. + // DRand may or or may not get in the way. Test this out. + expected, err := core.CalculateNewShardState( + node.Blockchain(), nextEpoch, stakeInfo) + if err != nil { + return ctxerror.New("cannot calculate expected shard state"). + WithCause(err) + } + if types.CompareShardState(expected, proposed) != 0 { + // TODO ek – log state proposal differences + // TODO ek – this error should trigger view change + err := errors.New("shard state proposal is different from expected") + // TODO ek/chao – calculated shard state is different even with the + // same input, i.e. it is nondeterministic. + // Don't treat this as a blocker until we fix the nondeterminism. + //return err + ctxerror.Log15(utils.GetLogger().Warn, err) + } + } else { + // Regular validators fetch the local-shard copy on the beacon chain + // and compare it against the proposed copy. + // + // We trust the master proposal in our copy of beacon chain. + // The sanity check for the master proposal is done earlier, + // when the beacon block containing the master proposal is received + // and before it is admitted into the local beacon chain. + // + // TODO ek – fetch masterProposal from beaconchain instead + masterProposal := node.nextShardState.master.ShardState + expected := masterProposal.FindCommitteeByID(block.ShardID()) + switch len(proposed) { + case 0: + // Proposal to discontinue shard + if expected != nil { + // TODO ek – invoke view change + return errors.New( + "leader proposed to disband against beacon decision") + } + case 1: + // Proposal to continue shard + proposed := proposed[0] + // Sanity check: Shard ID should match + if proposed.ShardID != block.ShardID() { + // TODO ek – invoke view change + return ctxerror.New("proposal has incorrect shard ID", + "proposedShard", proposed.ShardID, + "blockShard", block.ShardID()) + } + // Did beaconchain say we are no more? + if expected == nil { + // TODO ek – invoke view change + return errors.New( + "leader proposed to continue against beacon decision") + } + // Did beaconchain say the same proposal? + if types.CompareCommittee(expected, &proposed) != 0 { + // TODO ek – log differences + // TODO ek – invoke view change + return errors.New("proposal differs from one in beacon chain") + } + default: + // TODO ek – invoke view change + return ctxerror.New( + "regular resharding proposal has incorrect number of shards", + "numShards", len(proposed)) + } + } + return nil } // PostConsensusProcessing is called by consensus participants, after consensus is done, to: @@ -315,29 +424,37 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) { // TODO: update staking information once per epoch. node.UpdateStakingList(node.QueryStakeInfo()) node.printStakingList() - if core.IsEpochBlock(newBlock) { - shardState := node.blockchain.StoreNewShardState(newBlock, &node.CurrentStakes) - if shardState != nil { - if nodeconfig.GetDefaultConfig().IsLeader() { - epochShardState := types.EpochShardState{Epoch: core.GetEpochFromBlockNumber(newBlock.NumberU64()), ShardState: shardState} - epochShardStateMessage := proto_node.ConstructEpochShardStateMessage(epochShardState) - // Broadcast new shard state - err := node.host.SendMessageToGroups([]p2p.GroupID{node.NodeConfig.GetClientGroupID()}, host.ConstructP2pMessage(byte(0), epochShardStateMessage)) - if err != nil { - utils.GetLogInstance().Error("[Resharding] failed to broadcast shard state message", "group", node.NodeConfig.GetClientGroupID()) - } else { - utils.GetLogInstance().Info("[Resharding] broadcasted shard state message to", "group", node.NodeConfig.GetClientGroupID()) - } - } - node.processEpochShardState(&types.EpochShardState{Epoch: core.GetEpochFromBlockNumber(newBlock.NumberU64()), ShardState: shardState}) + } + newBlockHeader := newBlock.Header() + if newBlockHeader.ShardStateHash != (common.Hash{}) { + if node.Consensus.ShardID == 0 { + // TODO ek – this is a temp hack until beacon chain sync is fixed + // End-of-epoch block on beacon chain; block's EpochState is the + // master resharding table. Broadcast it to the network. + if err := node.broadcastEpochShardState(newBlock); err != nil { + e := ctxerror.New("cannot broadcast shard state").WithCause(err) + ctxerror.Log15(utils.GetLogInstance().Error, e) } } + node.transitionIntoNextEpoch(newBlockHeader.ShardState) } } +func (node *Node) broadcastEpochShardState(newBlock *types.Block) error { + epochShardStateMessage := proto_node.ConstructEpochShardStateMessage( + types.EpochShardState{ + Epoch: newBlock.Header().Epoch.Uint64() + 1, + ShardState: newBlock.Header().ShardState, + }, + ) + return node.host.SendMessageToGroups( + []p2p.GroupID{node.NodeConfig.GetClientGroupID()}, + host.ConstructP2pMessage(byte(0), epochShardStateMessage)) +} + // AddNewBlock is usedd to add new block into the blockchain. func (node *Node) AddNewBlock(newBlock *types.Block) { - blockNum, err := node.blockchain.InsertChain([]*types.Block{newBlock}) + blockNum, err := node.Blockchain().InsertChain([]*types.Block{newBlock}) if err != nil { utils.GetLogInstance().Debug("Error adding new block to blockchain", "blockNum", blockNum, "hash", newBlock.Header().Hash(), "Error", err) } else { @@ -473,7 +590,10 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { } if pong.ShardID != node.Consensus.ShardID { - utils.GetLogInstance().Error("Received Pong message for the wrong shard", "receivedShardID", pong.ShardID) + utils.GetLogInstance().Error( + "Received Pong message for the wrong shard", + "receivedShardID", pong.ShardID, + "expectedShardID", node.Consensus.ShardID) return 0 } @@ -547,89 +667,115 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { return 0 } -func (node *Node) epochShardStateMessageHandler(msgPayload []byte) int { - utils.GetLogInstance().Error("[Received new shard state]") +func (node *Node) epochShardStateMessageHandler(msgPayload []byte) error { + logger := utils.GetLogInstance() + getLogger := func() log.Logger { return utils.WithCallerSkip(logger, 1) } epochShardState, err := proto_node.DeserializeEpochShardStateFromMessage(msgPayload) if err != nil { - utils.GetLogInstance().Error("Can't get shard state Message", "error", err) - return -1 + return ctxerror.New("Can't get shard state message").WithCause(err) } - if (node.Consensus != nil && node.Consensus.ShardID != 0) || node.NodeConfig.Role() == nodeconfig.NewNode { - node.processEpochShardState(epochShardState) + if node.Consensus == nil && node.NodeConfig.Role() != nodeconfig.NewNode { + return nil } - return 0 + receivedEpoch := big.NewInt(int64(epochShardState.Epoch)) + getLogger().Info("received new shard state", "epoch", receivedEpoch) + node.nextShardState.master = epochShardState + if node.NodeConfig.IsLeader() { + // Wait a bit to allow the master table to reach other validators. + node.nextShardState.proposeTime = time.Now().Add(5 * time.Second) + } else { + // Wait a bit to allow the master table to reach the leader, + // and to allow the leader to propose next shard state based upon it. + node.nextShardState.proposeTime = time.Now().Add(15 * time.Second) + } + // TODO ek – this should be done from replaying beaconchain once + // beaconchain sync is fixed + err = node.Beaconchain().WriteShardState( + receivedEpoch, epochShardState.ShardState) + if err != nil { + return ctxerror.New("cannot store shard state", "epoch", receivedEpoch). + WithCause(err) + } + return nil } -func (node *Node) processEpochShardState(epochShardState *types.EpochShardState) { - shardState := epochShardState.ShardState - epoch := epochShardState.Epoch +func (node *Node) transitionIntoNextEpoch(shardState types.ShardState) { + logger := utils.GetLogInstance() + getLogger := func() log.Logger { return utils.WithCallerSkip(logger, 1) } + logger = logger.New( + "blsPubKey", hex.EncodeToString(node.Consensus.PubKey.Serialize()), + "curShard", node.Blockchain().ShardID(), + "curLeader", node.NodeConfig.IsLeader()) for _, c := range shardState { - utils.GetLogInstance().Debug("new shard information", "shardID", c.ShardID, "NodeList", c.NodeList) - } - - myShardID := uint32(math.MaxUint32) - isNextLeader := false - myBlsPubKey := node.Consensus.PubKey.Serialize() - myShardState := types.Committee{} - for _, shard := range shardState { - for _, nodeID := range shard.NodeList { - if bytes.Compare(nodeID.BlsPublicKey[:], myBlsPubKey) == 0 { - myShardID = shard.ShardID - isNextLeader = shard.Leader == nodeID - myShardState = shard - } - } + logger.Debug("new shard information", + "shardID", c.ShardID, + "nodeList", c.NodeList) + } + myShardID, isNextLeader := findRoleInShardState( + node.Consensus.PubKey, shardState) + logger = logger.New( + "nextShard", myShardID, + "nextLeader", isNextLeader) + + if myShardID == math.MaxUint32 { + getLogger().Info("Somehow I got kicked out. Exiting") + os.Exit(8) // 8 represents it's a loop and the program restart itself } - if myShardID != uint32(math.MaxUint32) { - // Update public keys - ss := myShardState - publicKeys := []*bls.PublicKey{} - for _, nodeID := range ss.NodeList { - key := &bls.PublicKey{} - err := key.Deserialize(nodeID.BlsPublicKey[:]) - if err != nil { - utils.GetLogInstance().Error("Failed to deserialize BLS public key in shard state", "error", err) - } - publicKeys = append(publicKeys, key) + myShardState := shardState[myShardID] + + // Update public keys + var publicKeys []*bls.PublicKey + for idx, nodeID := range myShardState.NodeList { + key := &bls.PublicKey{} + err := key.Deserialize(nodeID.BlsPublicKey[:]) + if err != nil { + getLogger().Error("Failed to deserialize BLS public key in shard state", + "idx", idx, + "error", err) } - node.Consensus.UpdatePublicKeys(publicKeys) - node.DRand.UpdatePublicKeys(publicKeys) + publicKeys = append(publicKeys, key) + } + node.Consensus.UpdatePublicKeys(publicKeys) + node.DRand.UpdatePublicKeys(publicKeys) - aboutLeader := "" - if nodeconfig.GetDefaultConfig().IsLeader() { - aboutLeader = "I am not leader anymore" - if isNextLeader { - aboutLeader = "I am still leader" - } - } else { - aboutLeader = "I am still validator" - if isNextLeader { - aboutLeader = "I become the leader" - } + if node.Blockchain().ShardID() == myShardID { + getLogger().Info("staying in the same shard") + } else { + getLogger().Info("moving to another shard") + if err := node.shardChains.Close(); err != nil { + getLogger().Error("cannot close shard chains", "error", err) } - if node.blockchain.ShardID() == myShardID { - utils.GetLogInstance().Info(fmt.Sprintf("[Resharded][epoch:%d] I stay at shard %d, %s", epoch, myShardID, aboutLeader), "BlsPubKey", hex.EncodeToString(myBlsPubKey)) - } else { - utils.GetLogInstance().Info(fmt.Sprintf("[Resharded][epoch:%d] I got resharded to shard %d from shard %d, %s", epoch, myShardID, node.blockchain.ShardID(), aboutLeader), "BlsPubKey", hex.EncodeToString(myBlsPubKey)) - node.storeEpochShardState(epochShardState) + restartProcess(getRestartArguments(myShardID)) + } +} - execFile, err := getBinaryPath() - if err != nil { - utils.GetLogInstance().Crit("Failed to get program path when restarting program", "error", err, "file", execFile) - } - args := getRestartArguments(myShardID) - utils.GetLogInstance().Info("Restarting program", "args", args, "env", os.Environ()) - err = syscall.Exec(execFile, args, os.Environ()) - if err != nil { - utils.GetLogInstance().Crit("Failed to restart program after resharding", "error", err) +func findRoleInShardState( + key *bls.PublicKey, state types.ShardState, +) (shardID uint32, isLeader bool) { + keyBytes := key.Serialize() + for idx, shard := range state { + for nodeIdx, nodeID := range shard.NodeList { + if bytes.Compare(nodeID.BlsPublicKey[:], keyBytes) == 0 { + return uint32(idx), nodeIdx == 0 } } - } else { - utils.GetLogInstance().Info(fmt.Sprintf("[Resharded][epoch:%d] Somehow I got kicked out. Exiting", epoch), "BlsPubKey", hex.EncodeToString(myBlsPubKey)) - os.Exit(8) // 8 represents it's a loop and the program restart itself } + return math.MaxUint32, false +} + +func restartProcess(args []string) { + execFile, err := getBinaryPath() + if err != nil { + utils.GetLogInstance().Crit("Failed to get program path when restarting program", "error", err, "file", execFile) + } + utils.GetLogInstance().Info("Restarting program", "args", args, "env", os.Environ()) + err = syscall.Exec(execFile, args, os.Environ()) + if err != nil { + utils.GetLogInstance().Crit("Failed to restart program after resharding", "error", err) + } + panic("syscall.Exec() is not supposed to return") } func getRestartArguments(myShardID uint32) []string { @@ -670,38 +816,6 @@ func getBinaryPath() (argv0 string, err error) { return } -// Stores the epoch shard state into local file -// TODO: think about storing it into level db. -func (node *Node) storeEpochShardState(epochShardState *types.EpochShardState) { - byteBuffer := bytes.NewBuffer([]byte{}) - encoder := gob.NewEncoder(byteBuffer) - err := encoder.Encode(epochShardState) - if err != nil { - utils.GetLogInstance().Error("[Resharded] Failed to encode epoch shard state", "error", err) - } - err = ioutil.WriteFile("./epoch_shard_state"+node.SelfPeer.IP+node.SelfPeer.Port, byteBuffer.Bytes(), 0644) - if err != nil { - utils.GetLogInstance().Error("[Resharded] Failed to store epoch shard state in local file", "error", err) - } -} - -func (node *Node) retrieveEpochShardState() (*types.EpochShardState, error) { - b, err := ioutil.ReadFile("./epoch_shard_state" + node.SelfPeer.IP + node.SelfPeer.Port) - if err != nil { - utils.GetLogInstance().Error("[Resharded] Failed to retrieve epoch shard state", "error", err) - } - epochShardState := new(types.EpochShardState) - - r := bytes.NewBuffer(b) - decoder := gob.NewDecoder(r) - err = decoder.Decode(epochShardState) - - if err != nil { - return nil, fmt.Errorf("Decode local epoch shard state error") - } - return epochShardState, nil -} - // ConsensusMessageHandler passes received message in node_handler to consensus func (node *Node) ConsensusMessageHandler(msgPayload []byte) { if node.Consensus.ConsensusVersion == "v1" { diff --git a/node/node_handler_test.go b/node/node_handler_test.go index c1d0db57c..6f3b54dbf 100644 --- a/node/node_handler_test.go +++ b/node/node_handler_test.go @@ -23,7 +23,7 @@ func TestAddNewBlock(t *testing.T) { if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) } - node := New(host, consensus, nil, false) + node := New(host, consensus, testDBFactory, false) selectedTxs := node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock) node.Worker.CommitTransactions(selectedTxs) @@ -31,7 +31,7 @@ func TestAddNewBlock(t *testing.T) { node.AddNewBlock(block) - if node.blockchain.CurrentBlock().NumberU64() != 1 { + if node.Blockchain().CurrentBlock().NumberU64() != 1 { t.Error("New block is not added successfully") } } @@ -48,13 +48,13 @@ func TestVerifyNewBlock(t *testing.T) { if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) } - node := New(host, consensus, nil, false) + node := New(host, consensus, testDBFactory, false) selectedTxs := node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock) node.Worker.CommitTransactions(selectedTxs) block, _ := node.Worker.Commit() - if !node.VerifyNewBlock(block) { - t.Error("New block is not verified successfully") + if err := node.VerifyNewBlock(block); err != nil { + t.Error("New block is not verified successfully:", err) } } diff --git a/node/node_newblock.go b/node/node_newblock.go index fd7e76cb8..55ebcfa9c 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -1,9 +1,15 @@ package node import ( + "math/big" "time" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + + "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" ) @@ -12,7 +18,8 @@ const ( DefaultThreshold = 1 FirstTimeThreshold = 2 ConsensusTimeOut = 10 - PeriodicBlock = 3 * time.Second + PeriodicBlock = 1 * time.Second + BlockPeriod = 10 * time.Second ) // WaitForConsensusReady listen for the readiness signal from consensus and generate new block for consensus. @@ -27,6 +34,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}, stopChan chan firstTime := true var newBlock *types.Block timeoutCount := 0 + deadline := time.Now().Add(BlockPeriod) for { // keep waiting for Consensus ready select { @@ -50,25 +58,30 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}, stopChan chan threshold = FirstTimeThreshold firstTime = false } - if len(node.pendingTransactions) >= threshold { - utils.GetLogInstance().Debug("PROPOSING NEW BLOCK ------------------------------------------------", "blockNum", node.blockchain.CurrentBlock().NumberU64()+1, "threshold", threshold, "pendingTransactions", len(node.pendingTransactions)) + if len(node.pendingTransactions) >= threshold || !time.Now().Before(deadline) { + deadline = time.Now().Add(BlockPeriod) + utils.GetLogInstance().Debug("PROPOSING NEW BLOCK ------------------------------------------------", "blockNum", node.Blockchain().CurrentBlock().NumberU64()+1, "threshold", threshold, "pendingTransactions", len(node.pendingTransactions)) // Normal tx block consensus selectedTxs := node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock) - if len(selectedTxs) != 0 { - node.Worker.CommitTransactions(selectedTxs) - block, err := node.Worker.Commit() - if err != nil { - utils.GetLogInstance().Debug("Failed committing new block", "Error", err) - } else { - if node.Consensus.ShardID == 0 { - // add new shard state if it's epoch block - // TODO: bug fix - the stored shard state between here and PostConsensusProcessing are different. - //node.addNewShardStateHash(block) - } - newBlock = block - utils.GetLogInstance().Debug("Successfully proposed new block", "blockNum", block.NumberU64(), "numTxs", block.Transactions().Len()) - break - } + if err := node.Worker.CommitTransactions(selectedTxs); err != nil { + ctxerror.Log15(utils.GetLogger().Error, + ctxerror.New("cannot commit transacttions"). + WithCause(err)) + } + block, err := node.Worker.Commit() + if err != nil { + ctxerror.Log15(utils.GetLogInstance().Error, + ctxerror.New("Failed committing new block"). + WithCause(err)) + } else if err := node.proposeShardState(block); err != nil { + ctxerror.Log15(utils.GetLogger().Error, + ctxerror.New("cannot add shard state"). + WithCause(err)) + } else { + newBlock = block + utils.GetLogInstance().Debug("Successfully proposed new block", "blockNum", block.NumberU64(), "numTxs", block.Transactions().Len()) + threshold = DefaultThreshold + break } } // If not enough transactions to run Consensus, @@ -85,15 +98,6 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}, stopChan chan }() } -func (node *Node) addNewShardStateHash(block *types.Block) { - shardState := node.blockchain.GetNewShardState(block, &node.CurrentStakes) - if shardState != nil { - shardHash := shardState.Hash() - utils.GetLogInstance().Debug("[Shard State Hash] adding new shard state", "shardHash", shardHash) - block.AddShardStateHash(shardHash) - } -} - // WaitForConsensusReadyv2 listen for the readiness signal from consensus and generate new block for consensus. // only leader will receive the ready signal // TODO: clean pending transactions for validators; or validators not prepare pending transactions @@ -115,6 +119,7 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch case <-readySignal: firstTry := true + deadline := time.Now().Add(BlockPeriod) for { if !firstTry { time.Sleep(PeriodicBlock) @@ -127,34 +132,92 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch threshold = FirstTimeThreshold firstTime = false } - if len(node.pendingTransactions) < threshold { + if len(node.pendingTransactions) < threshold && time.Now().Before(deadline) { continue } + deadline = time.Now().Add(BlockPeriod) // Normal tx block consensus selectedTxs := node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock) - if len(selectedTxs) == 0 { - continue + utils.GetLogInstance().Debug("PROPOSING NEW BLOCK ------------------------------------------------", "blockNum", node.Blockchain().CurrentBlock().NumberU64()+1, "threshold", threshold, "selectedTxs", len(selectedTxs)) + if err := node.Worker.CommitTransactions(selectedTxs); err != nil { + ctxerror.Log15(utils.GetLogger().Error, + ctxerror.New("cannot commit transactions"). + WithCause(err)) } - utils.GetLogInstance().Debug("PROPOSING NEW BLOCK ------------------------------------------------", "blockNum", node.blockchain.CurrentBlock().NumberU64()+1, "threshold", threshold, "selectedTxs", len(selectedTxs)) - node.Worker.CommitTransactions(selectedTxs) block, err := node.Worker.Commit() if err != nil { - utils.GetLogInstance().Debug("Failed committing new block", "Error", err) + ctxerror.Log15(utils.GetLogger().Error, + ctxerror.New("cannot commit new block"). + WithCause(err)) continue - } - if node.Consensus.ShardID == 0 { - // add new shard state if it's epoch block - // TODO: bug fix - the stored shard state between here and PostConsensusProcessing are different. - //node.addNewShardStateHash(block) - } - newBlock := block - utils.GetLogInstance().Debug("Successfully proposed new block", "blockNum", block.NumberU64(), "numTxs", block.Transactions().Len()) + } else if err := node.proposeShardState(block); err != nil { + ctxerror.Log15(utils.GetLogger().Error, + ctxerror.New("cannot add shard state"). + WithCause(err)) + } else { + newBlock := block + utils.GetLogInstance().Debug("Successfully proposed new block", "blockNum", block.NumberU64(), "numTxs", block.Transactions().Len()) - // Send the new block to Consensus so it can be confirmed. - node.BlockChannel <- newBlock - break + // Send the new block to Consensus so it can be confirmed. + node.BlockChannel <- newBlock + break + } } } } }() } + +func (node *Node) proposeShardState(block *types.Block) error { + switch node.Consensus.ShardID { + case 0: + return node.proposeBeaconShardState(block) + default: + node.proposeLocalShardState(block) + return nil + } +} + +func (node *Node) proposeBeaconShardState(block *types.Block) error { + // TODO ek - replace this with variable epoch logic. + if !core.IsEpochLastBlock(block) { + // We haven't reached the end of this epoch; don't propose yet. + return nil + } + nextEpoch := new(big.Int).Add(block.Header().Epoch, common.Big1) + shardState, err := core.CalculateNewShardState( + node.Blockchain(), nextEpoch, &node.CurrentStakes) + if err != nil { + return err + } + block.AddShardState(shardState) + return nil +} + +func (node *Node) proposeLocalShardState(block *types.Block) { + logger := block.Logger(utils.GetLogInstance()) + getLogger := func() log.Logger { return utils.WithCallerSkip(logger, 1) } + // TODO ek – read this from beaconchain once BC sync is fixed + if node.nextShardState.master == nil { + getLogger().Debug("yet to receive master proposal from beaconchain") + return + } + logger = logger.New( + "nextEpoch", node.nextShardState.master.Epoch, + "proposeTime", node.nextShardState.proposeTime) + if time.Now().Before(node.nextShardState.proposeTime) { + getLogger().Debug("still waiting for shard state to propagate") + return + } + masterShardState := node.nextShardState.master.ShardState + var localShardState types.ShardState + committee := masterShardState.FindCommitteeByID(block.ShardID()) + if committee != nil { + getLogger().Info("found local shard info; proposing it") + localShardState = append(localShardState, *committee) + } else { + getLogger().Info("beacon committee disowned us; proposing nothing") + // Leave local proposal empty to signal the end of shard (disbanding). + } + block.AddShardState(localShardState) +} diff --git a/node/node_syncing.go b/node/node_syncing.go index b157568f3..525c35b75 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -6,7 +6,9 @@ import ( "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rlp" + "github.com/harmony-one/harmony/api/service/syncing" "github.com/harmony-one/harmony/api/service/syncing/downloader" downloader_pb "github.com/harmony-one/harmony/api/service/syncing/downloader/proto" @@ -42,7 +44,7 @@ func (node *Node) getNeighborPeers(neighbor *sync.Map) []p2p.Peer { // DoSyncWithoutConsensus gets sync-ed to blockchain without joining consensus func (node *Node) DoSyncWithoutConsensus() { - go node.DoSyncing(node.blockchain, node.Worker, node.GetSyncingPeers, false) //Don't join consensus + go node.DoSyncing(node.Blockchain(), node.Worker, node.GetSyncingPeers, false) //Don't join consensus } // GetBeaconSyncingPeers returns a list of peers for beaconchain syncing @@ -71,7 +73,7 @@ func (node *Node) DoBeaconSyncing() { } } node.beaconSync.AddLastMileBlock(beaconBlock) - node.beaconSync.SyncLoop(node.beaconChain, node.BeaconWorker, false, true) + node.beaconSync.SyncLoop(node.Beaconchain(), node.BeaconWorker, false, true) } } } @@ -80,12 +82,16 @@ func (node *Node) DoBeaconSyncing() { func (node *Node) DoSyncing(bc *core.BlockChain, worker *worker.Worker, getPeers func() []p2p.Peer, willJoinConsensus bool) { ticker := time.NewTicker(SyncFrequency * time.Second) + logger := utils.GetLogInstance() + getLogger := func() log.Logger { return utils.WithCallerSkip(logger, 1) } SyncingLoop: for { select { case <-ticker.C: if node.stateSync == nil { node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID()) + logger = logger.New("syncID", node.GetSyncID()) + getLogger().Debug("initialized state sync") } if node.stateSync.GetActivePeerNumber() == 0 { peers := getPeers() @@ -100,7 +106,9 @@ SyncingLoop: node.State = NodeNotInSync node.stateMutex.Unlock() node.stateSync.SyncLoop(bc, worker, willJoinConsensus, false) + getLogger().Debug("now in sync") if willJoinConsensus { + getLogger().Debug("entering NodeReadyForConsensus state") node.stateMutex.Lock() node.State = NodeReadyForConsensus node.stateMutex.Unlock() @@ -129,7 +137,7 @@ func (node *Node) SupportSyncing() { go node.SendNewBlockToUnsync() if node.NodeConfig.Role() != nodeconfig.ShardLeader && node.NodeConfig.Role() != nodeconfig.BeaconLeader { - go node.DoSyncing(node.blockchain, node.Worker, node.GetSyncingPeers, true) + go node.DoSyncing(node.Blockchain(), node.Worker, node.GetSyncingPeers, true) } } @@ -192,12 +200,12 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (* case downloader_pb.DownloaderRequest_HEADER: var startHeaderHash []byte if request.BlockHash == nil { - tmp := node.blockchain.Genesis().Hash() + tmp := node.Blockchain().Genesis().Hash() startHeaderHash = tmp[:] } else { startHeaderHash = request.BlockHash } - for block := node.blockchain.CurrentBlock(); block != nil; block = node.blockchain.GetBlockByHash(block.Header().ParentHash) { + for block := node.Blockchain().CurrentBlock(); block != nil; block = node.Blockchain().GetBlockByHash(block.Header().ParentHash) { blockHash := block.Hash() if bytes.Compare(blockHash[:], startHeaderHash) == 0 { break @@ -209,7 +217,7 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (* for _, bytes := range request.Hashes { var hash common.Hash hash.SetBytes(bytes) - block := node.blockchain.GetBlockByHash(hash) + block := node.Blockchain().GetBlockByHash(hash) if block == nil { continue } @@ -220,7 +228,7 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (* } case downloader_pb.DownloaderRequest_BLOCKHEIGHT: - response.BlockHeight = node.blockchain.CurrentBlock().NumberU64() + response.BlockHeight = node.Blockchain().CurrentBlock().NumberU64() // this is the out of sync node acts as grpc server side case downloader_pb.DownloaderRequest_NEWBLOCK: diff --git a/node/node_test.go b/node/node_test.go index 921889374..cca7fc5d0 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -7,8 +7,10 @@ import ( "time" bls2 "github.com/harmony-one/harmony/crypto/bls" + "github.com/harmony-one/harmony/internal/shardchain" "github.com/harmony-one/bls/ffi/go/bls" + "github.com/harmony-one/harmony/drand" proto_discovery "github.com/harmony-one/harmony/api/proto/discovery" @@ -19,6 +21,8 @@ import ( "github.com/harmony-one/harmony/p2p/p2pimpl" ) +var testDBFactory = &shardchain.MemDBFactory{} + func TestNewNode(t *testing.T) { pubKey := bls2.RandPrivateKey().GetPublicKey() leader := p2p.Peer{IP: "127.0.0.1", Port: "8882", ConsensusPubKey: pubKey} @@ -31,16 +35,16 @@ func TestNewNode(t *testing.T) { if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) } - node := New(host, consensus, nil, false) + node := New(host, consensus, testDBFactory, false) if node.Consensus == nil { t.Error("Consensus is not initialized for the node") } - if node.blockchain == nil { + if node.Blockchain() == nil { t.Error("Blockchain is not initialized for the node") } - if node.blockchain.CurrentBlock() == nil { + if node.Blockchain().CurrentBlock() == nil { t.Error("Genesis block is not initialized for the node") } } @@ -58,7 +62,7 @@ func TestGetSyncingPeers(t *testing.T) { if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) } - node := New(host, consensus, nil, false) + node := New(host, consensus, testDBFactory, false) peer := p2p.Peer{IP: "127.0.0.1", Port: "8000"} peer2 := p2p.Peer{IP: "127.0.0.1", Port: "8001"} node.Neighbors.Store("minh", peer) @@ -102,7 +106,7 @@ func TestAddPeers(t *testing.T) { } dRand := drand.New(host, 0, []p2p.Peer{leader, validator}, leader, nil, nil) - node := New(host, consensus, nil, false) + node := New(host, consensus, testDBFactory, false) node.DRand = dRand r1 := node.AddPeers(peers1) e1 := 2 @@ -148,7 +152,7 @@ func TestAddBeaconPeer(t *testing.T) { } dRand := drand.New(host, 0, []p2p.Peer{leader, validator}, leader, nil, nil) - node := New(host, consensus, nil, false) + node := New(host, consensus, testDBFactory, false) node.DRand = dRand for _, p := range peers1 { ret := node.AddBeaconPeer(p) @@ -220,7 +224,7 @@ func TestPingPongHandler(t *testing.T) { if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) } - node := New(host, consensus, nil, false) + node := New(host, consensus, testDBFactory, false) //go sendPingMessage(leader) go sendPongMessage(node, leader) go exitServer() diff --git a/node/rpc.go b/node/rpc.go index 9d99f4341..fb327282b 100644 --- a/node/rpc.go +++ b/node/rpc.go @@ -42,7 +42,7 @@ var ( // StartRPC start RPC service func (node *Node) StartRPC(nodePort string) error { // Gather all the possible APIs to surface - apiBackend = core.NewBackend(node.blockchain, node.TxPool, node.accountManager) + apiBackend = core.NewBackend(node.Blockchain(), node.TxPool, node.accountManager) apis := hmyapi.GetAPIs(apiBackend) for _, service := range node.serviceManager.GetServices() { diff --git a/node/service_setup.go b/node/service_setup.go index 87f3482d0..50ed7fd6b 100644 --- a/node/service_setup.go +++ b/node/service_setup.go @@ -37,14 +37,14 @@ func (node *Node) setupForShardLeader() { // Register new block service. node.serviceManager.RegisterService(service.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReadyv2)) // Register client support service. - node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) + node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.Blockchain().State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) } func (node *Node) setupForShardValidator() { nodeConfig, chanPeer := node.initNodeConfiguration() // Register client support service. - node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) + node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.Blockchain().State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) // Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node. node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer)) // Register networkinfo service. "0" is the beacon shard ID @@ -69,7 +69,7 @@ func (node *Node) setupForBeaconLeader() { // Register new block service. node.serviceManager.RegisterService(service.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReadyv2)) // Register client support service. - node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) + node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.Blockchain().State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) // TODO(minhdoan): We will remove the old client support and use the new client support which uses new message protocol. // Register client new support service. node.serviceManager.RegisterService(service.RestClientSupport, restclientsupport.New( @@ -86,7 +86,7 @@ func (node *Node) setupForBeaconValidator() { nodeConfig, chanPeer := node.initNodeConfiguration() // Register client support service. - node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) + node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.Blockchain().State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) // Register peer discovery service. No need to do staking for beacon chain node. node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil)) // Register networkinfo service. @@ -104,7 +104,7 @@ func (node *Node) setupForNewNode() { nodeConfig, chanPeer := node.initNodeConfiguration() // Register staking service. - node.serviceManager.RegisterService(service.Staking, staking.New(node.host, node.StakingAccount, node.beaconChain, node.NodeConfig.ConsensusPubKey)) + node.serviceManager.RegisterService(service.Staking, staking.New(node.host, node.StakingAccount, node.Beaconchain(), node.NodeConfig.ConsensusPubKey)) // Register peer discovery service. "0" is the beacon shard ID node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer)) // Register networkinfo service. "0" is the beacon shard ID diff --git a/node/staking.go b/node/staking.go index 49399656a..da5b659fb 100644 --- a/node/staking.go +++ b/node/staking.go @@ -41,7 +41,7 @@ func (node *Node) UpdateStakingList(stakeInfoReturnValue *structs.StakeInfoRetur lockPeriodCount := stakeInfoReturnValue.LockPeriodCounts[i] startEpoch := core.GetEpochFromBlockNumber(blockNum.Uint64()) - curEpoch := core.GetEpochFromBlockNumber(node.blockchain.CurrentBlock().NumberU64()) + curEpoch := core.GetEpochFromBlockNumber(node.Blockchain().CurrentBlock().NumberU64()) if startEpoch == curEpoch { continue // The token are counted into stakes at the beginning of next epoch. diff --git a/node/staking_test.go b/node/staking_test.go index 2adc07676..8ddf6b4ac 100644 --- a/node/staking_test.go +++ b/node/staking_test.go @@ -37,7 +37,7 @@ func TestUpdateStakingList(t *testing.T) { if err != nil { t.Fatalf("Cannot craeate consensus: %v", err) } - node := New(host, consensus, nil, false) + node := New(host, consensus, testDBFactory, false) for i := 0; i < 5; i++ { selectedTxs := node.getTransactionsForNewBlock(MaxNumberOfTransactionsPerBlock) diff --git a/node/worker/worker.go b/node/worker/worker.go index d4ca3f562..2af52bb6a 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -8,11 +8,13 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/params" + consensus_engine "github.com/harmony-one/harmony/consensus/engine" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/vm" + "github.com/harmony-one/harmony/internal/ctxerror" ) // environment is the worker's current environment and holds all of the current state information. @@ -110,11 +112,18 @@ func (w *Worker) UpdateCurrent() error { parent := w.chain.CurrentBlock() num := parent.Number() timestamp := time.Now().Unix() + // New block's epoch is the same as parent's... + epoch := new(big.Int).Set(parent.Header().Epoch) + if len(parent.Header().ShardState) > 0 { + // ... except if parent has a resharding assignment it increases by 1. + epoch = epoch.Add(epoch, common.Big1) + } header := &types.Header{ ParentHash: parent.Hash(), Number: num.Add(num, common.Big1), GasLimit: core.CalcGasLimit(parent, w.gasFloor, w.gasCeil), Time: big.NewInt(timestamp), + Epoch: epoch, ShardID: w.chain.ShardID(), } return w.makeCurrent(parent, header) @@ -150,7 +159,7 @@ func (w *Worker) Commit() (*types.Block, error) { s := w.current.state.Copy() block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, w.current.receipts) if err != nil { - return nil, err + return nil, ctxerror.New("cannot finalize block").WithCause(err) } return block, nil } @@ -170,11 +179,18 @@ func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus_en parent := worker.chain.CurrentBlock() num := parent.Number() timestamp := time.Now().Unix() + // New block's epoch is the same as parent's... + epoch := new(big.Int).Set(parent.Header().Epoch) + if len(parent.Header().ShardState) > 0 { + // ... except if parent has a resharding assignment it increases by 1. + epoch = epoch.Add(epoch, common.Big1) + } header := &types.Header{ ParentHash: parent.Hash(), Number: num.Add(num, common.Big1), GasLimit: core.CalcGasLimit(parent, worker.gasFloor, worker.gasCeil), Time: big.NewInt(timestamp), + Epoch: epoch, ShardID: worker.chain.ShardID(), } worker.makeCurrent(parent, header) diff --git a/test/deploy.sh b/test/deploy.sh index b3801ea8c..57ef03612 100755 --- a/test/deploy.sh +++ b/test/deploy.sh @@ -69,7 +69,7 @@ EOU exit 0 } -DB= +DB=false TXGEN=true DURATION= MIN=5 @@ -80,7 +80,7 @@ DRYRUN= while getopts "hdtD:m:s:nS" option; do case $option in h) usage ;; - d) DB='-db_supported' ;; + d) DB=true ;; t) TXGEN=false ;; D) DURATION=$OPTARG ;; m) MIN=$OPTARG ;; @@ -131,9 +131,15 @@ echo "launching boot node ..." $DRYRUN $ROOT/bin/bootnode -port 19876 > $log_folder/bootnode.log 2>&1 | tee -a $LOG_FILE & sleep 1 BN_MA=$(grep "BN_MA" $log_folder/bootnode.log | awk -F\= ' { print $2 } ') -HMY_OPT2=" -bootnodes $BN_MA" echo "bootnode launched." + " $BN_MA" -HMY_OPT3=" -is_genesis" + +unset -v base_args +declare -a base_args args +base_args=(-log_folder "${log_folder}" -min_peers "${MIN}" -bootnodes "${BN_MA}") +if "${DB}" +then + base_args=("${base_args[@]}" -db_supported) +fi NUM_NN=0 @@ -143,27 +149,20 @@ sleep 2 i=0 while IFS='' read -r line || [[ -n "$line" ]]; do IFS=' ' read ip port mode shardID <<< $line - if [ "$mode" == "leader" ]; then - echo "launching leader ..." - $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -account_index $i -min_peers $MIN $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key -is_leader 2>&1 | tee -a $LOG_FILE & - fi - if [ "$mode" == "leader_archival" ]; then - echo "launching leader ..." - $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -account_index $i -min_peers $MIN $HMY_OPT2 -key /tmp/$ip-$port.key -is_leader -is_archival 2>&1 | tee -a $LOG_FILE & - fi - if [ "$mode" == "validator" ]; then - echo "launching validator ..." - $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -account_index $i -min_peers $MIN $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key 2>&1 | tee -a $LOG_FILE & - fi - if [ "$mode" == "archival" ]; then - echo "launching archival node ... wait" - $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -account_index $i -min_peers $MIN $HMY_OPT2 -key /tmp/$ip-$port.key -is_archival 2>&1 | tee -a $LOG_FILE & - fi - if [[ "$mode" == "newnode" && "$SYNC" == "true" ]]; then - (( NUM_NN += 30 )) - echo "launching new node ..." - (sleep $NUM_NN; $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -account_index $i -min_peers $MIN $HMY_OPT2 -key /tmp/$ip-$port.key 2>&1 | tee -a $LOG_FILE ) & - fi + args=("${base_args[@]}" -ip "${ip}" -port "${port}" -account_index "${i}" -key "/tmp/${ip}-${port}.key" -db_dir "db-${ip}-${port}") + case "${mode}" in + leader*|validator*) args=("${args[@]}" -is_genesis);; + esac + case "${mode}" in leader*) args=("${args[@]}" -is_leader);; esac + case "${mode}" in *archival|archival) args=("${args[@]}" -is_archival);; esac + case "${mode}" in + newnode) + "${SYNC}" || continue + sleep "${NUM_NN}" + NUM_NN=$((${NUM_NN} + 30)) + ;; + esac + $DRYRUN "${ROOT}/bin/harmony" "${args[@]}" 2>&1 | tee -a "${LOG_FILE}" & i=$((i+1)) done < $config @@ -173,7 +172,7 @@ if [ "$TXGEN" == "true" ]; then line=$(grep client $config) IFS=' ' read ip port mode shardID <<< $line if [ "$mode" == "client" ]; then - $DRYRUN $ROOT/bin/txgen -log_folder $log_folder -duration $DURATION -ip $ip -port $port $HMY_OPT2 2>&1 | tee -a $LOG_FILE + $DRYRUN $ROOT/bin/txgen -log_folder $log_folder -duration $DURATION -ip $ip -port $port -bootnodes "${BN_MA}" > $LOG_FILE 2>&1 fi else sleep $DURATION