diff --git a/api/service/explorer/service.go b/api/service/explorer/service.go index a4ee5bf28..603eecfde 100644 --- a/api/service/explorer/service.go +++ b/api/service/explorer/service.go @@ -16,8 +16,10 @@ import ( "github.com/gorilla/mux" libp2p_peer "github.com/libp2p/go-libp2p-peer" + "github.com/harmony-one/bls/ffi/go/bls" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/core/types" + bls2 "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/internal/bech32" common2 "github.com/harmony-one/harmony/internal/common" "github.com/harmony-one/harmony/internal/ctxerror" @@ -198,11 +200,54 @@ func (s *Service) GetExplorerBlocks(w http.ResponseWriter, r *http.Request) { } accountBlocks := s.ReadBlocksFromDB(fromInt, toInt) + curEpoch := int64(-1) + committee := &types.Committee{} for id, accountBlock := range accountBlocks { if id == 0 || id == len(accountBlocks)-1 || accountBlock == nil { continue } block := NewBlock(accountBlock, id+fromInt-1) + if int64(block.Epoch) > curEpoch { + if bytes, err := db.Get([]byte(GetCommitteeKey(uint32(s.ShardID), block.Epoch))); err == nil { + committee = &types.Committee{} + if err = rlp.DecodeBytes(bytes, committee); err != nil { + utils.Logger().Warn().Err(err).Msg("cannot read committee for new epoch") + } + } else { + state, err := accountBlock.Header().GetShardState() + if err == nil { + for _, shardCommittee := range state { + if shardCommittee.ShardID == accountBlock.ShardID() { + committee = &shardCommittee + break + } + } + } + } + curEpoch = int64(block.Epoch) + } + pubkeys := make([]*bls.PublicKey, len(committee.NodeList)) + for i, validator := range committee.NodeList { + pubkeys[i] = new(bls.PublicKey) + validator.BlsPublicKey.ToLibBLSPublicKey(pubkeys[i]) + } + mask, err := bls2.NewMask(pubkeys, nil) + if err == nil && accountBlocks[id+1] != nil { + err = mask.SetMask(accountBlocks[id+1].Header().LastCommitBitmap) + if err == nil { + for _, validator := range committee.NodeList { + oneAddress, err := common2.AddressToBech32(validator.EcdsaAddress) + if err != nil { + continue + } + blsPublicKey := new(bls.PublicKey) + validator.BlsPublicKey.ToLibBLSPublicKey(blsPublicKey) + if ok, err := mask.KeyEnabled(blsPublicKey); err == nil && ok { + block.Signers = append(block.Signers, oneAddress) + } + } + } + } // Populate transactions for _, tx := range accountBlock.Transactions() { transaction := GetTransaction(tx, accountBlock) diff --git a/api/service/explorer/structs.go b/api/service/explorer/structs.go index 650d589a2..8d3f79c0f 100644 --- a/api/service/explorer/structs.go +++ b/api/service/explorer/structs.go @@ -6,7 +6,6 @@ import ( "strconv" "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/internal/common" "github.com/harmony-one/harmony/internal/utils" ) @@ -88,22 +87,20 @@ type Shard struct { func NewBlock(block *types.Block, height int) *Block { // TODO(ricl): use block.Header().CommitBitmap and GetPubKeyFromMask signers := []string{} - state, err := block.Header().GetShardState() + /*state, err := block.Header().GetShardState() if err == nil { for _, committee := range state { if committee.ShardID == block.ShardID() { - for _, validator := range committee.NodeList { + for i, validator := range committee.NodeList { oneAddress, err := common.AddressToBech32(validator.EcdsaAddress) - if err != nil { + if err != nil && block.Header().LastCommitBitmap[i] != 0x0 { continue } signers = append(signers, oneAddress) } } } - } else { - utils.Logger().Warn().Err(err).Msgf("bad state block %d", block.NumberU64()) - } + }*/ return &Block{ Height: strconv.Itoa(height), ID: block.Hash().Hex(), diff --git a/api/service/syncing/syncing.go b/api/service/syncing/syncing.go index a4a62f104..743bf73bb 100644 --- a/api/service/syncing/syncing.go +++ b/api/service/syncing/syncing.go @@ -620,6 +620,7 @@ func (ss *StateSync) generateNewState(bc *core.BlockChain, worker *worker.Worker } // ProcessStateSync processes state sync from the blocks received but not yet processed so far +// TODO: return error func (ss *StateSync) ProcessStateSync(startHash []byte, size uint32, bc *core.BlockChain, worker *worker.Worker) { // Gets consensus hashes. if !ss.GetConsensusHashes(startHash, size) { @@ -691,7 +692,7 @@ func (ss *StateSync) getMaxPeerHeight() uint64 { go func() { defer wg.Done() //debug - // utils.Logger().Warn().Str("IP", peerConfig.ip).Str("Port", peerConfig.port).Msg("[Sync]getMaxPeerHeight") + utils.Logger().Debug().Str("IP", peerConfig.ip).Str("Port", peerConfig.port).Msg("[Sync]getMaxPeerHeight") response, err := peerConfig.client.GetBlockChainHeight() if err != nil { utils.Logger().Warn().Err(err).Str("IP", peerConfig.ip).Str("Port", peerConfig.port).Msg("[Sync]GetBlockChainHeight failed") @@ -733,15 +734,17 @@ func (ss *StateSync) SyncLoop(bc *core.BlockChain, worker *worker.Worker, willJo if !isBeacon { ss.RegisterNodeInfo() } + // remove SyncLoopFrequency ticker := time.NewTicker(SyncLoopFrequency * time.Second) +Loop: for { select { case <-ticker.C: otherHeight := ss.getMaxPeerHeight() currentHeight := bc.CurrentBlock().NumberU64() if currentHeight >= otherHeight { - utils.Logger().Info().Msgf("[SYNC] Node is now IN SYNC! (ShardID: %d)", bc.ShardID()) - break + utils.Logger().Info().Msgf("[SYNC] Node is now IN SYNC! (ShardID: %d, otherHeight: %d, currentHeight: %d)", bc.ShardID(), otherHeight, currentHeight) + break Loop } startHash := bc.CurrentBlock().Hash() size := uint32(otherHeight - currentHeight) diff --git a/cmd/client/wallet/main.go b/cmd/client/wallet/main.go index a663d5a42..d1e5e694f 100644 --- a/cmd/client/wallet/main.go +++ b/cmd/client/wallet/main.go @@ -92,8 +92,8 @@ var ( transferReceiverPtr = transferCommand.String("to", "", "Specify the receiver account") transferAmountPtr = transferCommand.Float64("amount", 0, "Specify the amount to transfer") transferGasPricePtr = transferCommand.Uint64("gasPrice", 0, "Specify the gas price amount. Unit is Nano.") - transferShardIDPtr = transferCommand.Int("shardID", 0, "Specify the shard ID for the transfer") - transferToShardIDPtr = transferCommand.Int("toShardID", 0, "Specify the destination shard ID for the transfer") + transferShardIDPtr = transferCommand.Int("shardID", -1, "Specify the shard ID for the transfer") + transferToShardIDPtr = transferCommand.Int("toShardID", -1, "Specify the destination shard ID for the transfer") transferInputDataPtr = transferCommand.String("inputData", "", "Base64-encoded input data to embed in the transaction") transferSenderPassPtr = transferCommand.String("pass", "", "Passphrase of the sender's private key") diff --git a/core/blockchain.go b/core/blockchain.go index 219dbcc08..226f30c82 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1130,6 +1130,7 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { Str("parentHash", header.ParentHash.Hex()). Msg("added block to chain") + // TODO: move into WriteBlockWithState if header.ShardStateHash != (common.Hash{}) { epoch := new(big.Int).Add(header.Epoch, common.Big1) err = bc.WriteShardStateBytes(epoch, header.ShardState) @@ -1138,6 +1139,8 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) { return n, err } } + + // TODO: move into WriteBlockWithState if len(header.CrossLinks) > 0 { crossLinks := &types.CrossLinks{} err = rlp.DecodeBytes(header.CrossLinks, crossLinks) @@ -2154,7 +2157,7 @@ func (bc *BlockChain) NextCXReceiptsCheckpoint(currentNum uint64, shardID uint32 for num := lastCheckpoint; num <= currentNum+1; num++ { by, _ := rawdb.ReadCXReceiptsProofSpent(bc.db, shardID, num) if by == rawdb.NAByte { - // TODO: check if there is IncompingReceiptsHash in crosslink header + // TODO chao: check if there is IncompingReceiptsHash in crosslink header // if the rootHash is non-empty, it means incomingReceipts are not delivered // otherwise, it means there is no cross-shard transactions for this block newCheckpoint = num diff --git a/core/chain_makers.go b/core/chain_makers.go index 55dc31db7..a16df5bb1 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -96,7 +96,6 @@ func (b *BlockGen) AddTxWithChain(bc *BlockChain, tx *types.Transaction) { b.SetCoinbase(common.Address{}) } b.statedb.Prepare(tx.Hash(), common.Hash{}, len(b.txs)) - // TODO (chao): may need to add cxReceipt for BlockGen receipt, _, _, err := ApplyTransaction(b.config, bc, &b.header.Coinbase, b.gasPool, b.statedb, b.header, tx, &b.header.GasUsed, vm.Config{}) if err != nil { panic(err) @@ -186,7 +185,6 @@ func GenerateChain(config *params.ChainConfig, parent *types.Block, engine conse } if b.engine != nil { // Finalize and seal the block - // TODO (chao): add cxReceipt in the last input block, err := b.engine.Finalize(chainreader, b.header, statedb, b.txs, b.receipts, nil, nil) if err != nil { panic(err) diff --git a/core/evm.go b/core/evm.go index ecc5b6321..8731f5b12 100644 --- a/core/evm.go +++ b/core/evm.go @@ -95,7 +95,7 @@ func Transfer(db vm.StateDB, sender, recipient common.Address, amount *big.Int, if txType == types.SameShardTx || txType == types.SubtractionOnly { db.SubBalance(sender, amount) } - if txType == types.SameShardTx || txType == types.AdditionOnly { + if txType == types.SameShardTx { db.AddBalance(recipient, amount) } } diff --git a/core/state_processor.go b/core/state_processor.go index 75288eb1b..895811df7 100644 --- a/core/state_processor.go +++ b/core/state_processor.go @@ -85,7 +85,10 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.DB, cfg vm.C // incomingReceipts should always be processed after transactions (to be consistent with the block proposal) for _, cx := range block.IncomingReceipts() { - ApplyIncomingReceipt(p.config, statedb, header, cx) + err := ApplyIncomingReceipt(p.config, statedb, header, cx) + if err != nil { + return nil, nil, nil, 0, ctxerror.New("cannot apply incoming receipts").WithCause(err) + } } // Finalize the block, applying any consensus engine specific extras (e.g. block rewards) @@ -102,7 +105,9 @@ func getTransactionType(header *types.Header, tx *types.Transaction) types.Trans if tx.ShardID() == tx.ToShardID() && header.ShardID == tx.ShardID() { return types.SameShardTx } - if tx.ShardID() != tx.ToShardID() && header.ShardID == tx.ShardID() { + numShards := ShardingSchedule.InstanceForEpoch(header.Epoch).NumShards() + // Assuming here all the shards are consecutive from 0 to n-1, n is total number of shards + if tx.ShardID() != tx.ToShardID() && header.ShardID == tx.ShardID() && tx.ToShardID() < numShards { return types.SubtractionOnly } return types.InvalidTx @@ -167,16 +172,15 @@ func ApplyTransaction(config *params.ChainConfig, bc ChainContext, author *commo } // ApplyIncomingReceipt will add amount into ToAddress in the receipt -func ApplyIncomingReceipt(config *params.ChainConfig, db *state.DB, header *types.Header, cxp *types.CXReceiptsProof) { +func ApplyIncomingReceipt(config *params.ChainConfig, db *state.DB, header *types.Header, cxp *types.CXReceiptsProof) error { if cxp == nil { - return + return nil } // TODO: how to charge gas here? for _, cx := range cxp.Receipts { if cx == nil || cx.To == nil { // should not happend - utils.Logger().Warn().Msg("ApplyIncomingReceipts: Invalid incoming receipt!!") - continue + return ctxerror.New("ApplyIncomingReceipts: Invalid incomingReceipt!", "receipt", cx) } utils.Logger().Info().Msgf("ApplyIncomingReceipts: ADDING BALANCE %d", cx.Amount) @@ -186,4 +190,5 @@ func ApplyIncomingReceipt(config *params.ChainConfig, db *state.DB, header *type db.AddBalance(*cx.To, cx.Amount) db.IntermediateRoot(config.IsEIP158(header.Number)).Bytes() } + return nil } diff --git a/core/types/block.go b/core/types/block.go index f8da8b304..a4ab795ec 100644 --- a/core/types/block.go +++ b/core/types/block.go @@ -473,9 +473,10 @@ func (b *Block) WithBody(transactions []*Transaction, uncles []*Header, incoming header: CopyHeader(b.header), transactions: make([]*Transaction, len(transactions)), uncles: make([]*Header, len(uncles)), - incomingReceipts: incomingReceipts, + incomingReceipts: make([]*CXReceiptsProof, len(incomingReceipts)), } copy(block.transactions, transactions) + copy(block.incomingReceipts, incomingReceipts) for i := range uncles { block.uncles[i] = CopyHeader(uncles[i]) } diff --git a/core/types/transaction.go b/core/types/transaction.go index cc7ca3999..757bf8a4a 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -45,7 +45,6 @@ type TransactionType byte const ( SameShardTx TransactionType = iota SubtractionOnly // only subtract tokens from source shard account - AdditionOnly // only add tokens to destination shard account InvalidTx ) @@ -64,8 +63,8 @@ func (txType TransactionType) String() string { return "SameShardTx" } else if txType == SubtractionOnly { return "SubtractionOnly" - } else if txType == AdditionOnly { - return "AdditionOnly" + } else if txType == InvalidTx { + return "InvalidTx" } return "Unknown" } diff --git a/core/types/transaction_signing.go b/core/types/transaction_signing.go index 3d86a4ebe..5610eb31a 100644 --- a/core/types/transaction_signing.go +++ b/core/types/transaction_signing.go @@ -233,10 +233,10 @@ func (fs FrontierSigner) Sender(tx *Transaction) (common.Address, error) { } func recoverPlain(sighash common.Hash, R, S, Vb *big.Int, homestead bool) (common.Address, error) { - V := byte(Vb.Uint64() - 27) if Vb.BitLen() > 8 { return common.Address{}, ErrInvalidSig } + V := byte(Vb.Uint64() - 27) if !crypto.ValidateSignatureValues(V, R, S, homestead) { return common.Address{}, ErrInvalidSig } diff --git a/internal/configs/sharding/fixedschedule.go b/internal/configs/sharding/fixedschedule.go index 796545b39..97e83c8c3 100644 --- a/internal/configs/sharding/fixedschedule.go +++ b/internal/configs/sharding/fixedschedule.go @@ -90,6 +90,10 @@ func (s fixedSchedule) TxsThrottleConfig() *TxsThrottleConfig { } } +func (s fixedSchedule) GetNetworkID() NetworkID { + return DevNet +} + // NewFixedSchedule returns a sharding configuration schedule that uses the // given config instance for all epochs. Useful for testing. func NewFixedSchedule(instance Instance) Schedule { diff --git a/internal/configs/sharding/instance.go b/internal/configs/sharding/instance.go index 6fe672e4a..66ed94844 100644 --- a/internal/configs/sharding/instance.go +++ b/internal/configs/sharding/instance.go @@ -7,6 +7,18 @@ import ( "github.com/harmony-one/harmony/internal/genesis" ) +// NetworkID is the network type of the blockchain. +type NetworkID byte + +//Consensus and other message categories +const ( + MainNet NetworkID = iota + TestNet + LocalNet + Pangaea + DevNet +) + type instance struct { numShards uint32 numNodesPerShard int @@ -118,3 +130,8 @@ func (sc instance) FindAccount(blsPubKey string) (bool, *genesis.DeployAccount) func (sc instance) ReshardingEpoch() []*big.Int { return sc.reshardingEpoch } + +// ReshardingEpoch returns the list of epoch number +func (sc instance) GetNetworkID() NetworkID { + return DevNet +} diff --git a/internal/configs/sharding/localnet.go b/internal/configs/sharding/localnet.go index a044cfb57..9e09e24ad 100644 --- a/internal/configs/sharding/localnet.go +++ b/internal/configs/sharding/localnet.go @@ -127,6 +127,10 @@ func (ls localnetSchedule) TxsThrottleConfig() *TxsThrottleConfig { } } +func (ls localnetSchedule) GetNetworkID() NetworkID { + return LocalNet +} + var localnetReshardingEpoch = []*big.Int{big.NewInt(0), big.NewInt(localnetV1Epoch), big.NewInt(localnetV2Epoch)} var localnetV0 = MustNewInstance(2, 7, 5, genesis.LocalHarmonyAccounts, genesis.LocalFnAccounts, localnetReshardingEpoch) diff --git a/internal/configs/sharding/mainnet.go b/internal/configs/sharding/mainnet.go index 9f0b4038f..f1048c1a1 100644 --- a/internal/configs/sharding/mainnet.go +++ b/internal/configs/sharding/mainnet.go @@ -142,6 +142,10 @@ func (ms mainnetSchedule) TxsThrottleConfig() *TxsThrottleConfig { } } +func (ms mainnetSchedule) GetNetworkID() NetworkID { + return MainNet +} + var mainnetReshardingEpoch = []*big.Int{big.NewInt(0), big.NewInt(mainnetV0_1Epoch), big.NewInt(mainnetV0_2Epoch), big.NewInt(mainnetV0_3Epoch), big.NewInt(mainnetV0_4Epoch), big.NewInt(mainnetV1Epoch)} var mainnetV0 = MustNewInstance(4, 150, 112, genesis.HarmonyAccounts, genesis.FoundationalNodeAccounts, mainnetReshardingEpoch) diff --git a/internal/configs/sharding/pangaea.go b/internal/configs/sharding/pangaea.go index 8841e47f8..40b6c7a4b 100644 --- a/internal/configs/sharding/pangaea.go +++ b/internal/configs/sharding/pangaea.go @@ -89,3 +89,7 @@ func (ps pangaeaSchedule) TxsThrottleConfig() *TxsThrottleConfig { RecentTxDuration: ps.RecentTxDuration(), } } + +func (pangaeaSchedule) GetNetworkID() NetworkID { + return Pangaea +} diff --git a/internal/configs/sharding/shardingconfig.go b/internal/configs/sharding/shardingconfig.go index 06ec5cb43..f128c4265 100644 --- a/internal/configs/sharding/shardingconfig.go +++ b/internal/configs/sharding/shardingconfig.go @@ -56,6 +56,9 @@ type Schedule interface { // configuration for throttling pending transactions TxsThrottleConfig() *TxsThrottleConfig + + // GetNetworkID() return networkID type. + GetNetworkID() NetworkID } // Instance is one sharding configuration instance. diff --git a/internal/configs/sharding/testnet.go b/internal/configs/sharding/testnet.go index 6e20b21e4..f76a5a9e4 100644 --- a/internal/configs/sharding/testnet.go +++ b/internal/configs/sharding/testnet.go @@ -126,6 +126,10 @@ func (ts testnetSchedule) TxsThrottleConfig() *TxsThrottleConfig { } } +func (ts testnetSchedule) GetNetworkID() NetworkID { + return TestNet +} + var testnetReshardingEpoch = []*big.Int{big.NewInt(0), big.NewInt(testnetV1Epoch), big.NewInt(testnetV2Epoch)} var testnetV0 = MustNewInstance(2, 150, 150, genesis.TNHarmonyAccounts, genesis.TNFoundationalAccounts, testnetReshardingEpoch) diff --git a/node/node_cross_shard.go b/node/node_cross_shard.go index 35b5eebd0..313487028 100644 --- a/node/node_cross_shard.go +++ b/node/node_cross_shard.go @@ -115,6 +115,15 @@ func (node *Node) verifyIncomingReceipts(block *types.Block) error { return err } } + + incomingReceiptHash := types.EmptyRootHash + if len(cxps) > 0 { + incomingReceiptHash = types.DeriveSha(cxps) + } + if incomingReceiptHash != block.Header().IncomingReceiptHash { + return ctxerror.New("[verifyIncomingReceipts] Invalid IncomingReceiptHash in block header") + } + return nil } @@ -236,6 +245,7 @@ func (node *Node) ProposeCrossLinkDataForBeaconchain() (types.CrossLinks, error) utils.Logger().Error(). Err(err). Msgf("[CrossLink] Haven't received the first cross link %d", link.BlockNum().Uint64()) + break } else { err := node.VerifyCrosslinkHeader(lastLink.Header(), link.Header()) if err != nil { @@ -277,11 +287,7 @@ func (node *Node) ProcessReceiptMessage(msgPayload []byte) { return } - // TODO: check message signature is from the nodes of source shard. - - // TODO: remove in future if not useful - node.Blockchain().WriteCXReceipts(cxp.MerkleProof.ShardID, cxp.MerkleProof.BlockNum.Uint64(), cxp.MerkleProof.BlockHash, cxp.Receipts, true) - utils.Logger().Debug().Interface("cxp", cxp).Msg("[ProcessReceiptMessage] Add CXReceiptsProof to pending Receipts") + // TODO: integrate with txpool node.AddPendingReceipts(&cxp) } diff --git a/node/node_handler.go b/node/node_handler.go index 3ba0e405e..f73740e9b 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -355,6 +355,7 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) error { } // Verify cross links + // TODO: move into ValidateNewBlock if node.NodeConfig.ShardID == 0 { err := node.VerifyBlockCrossLinks(newBlock) if err != nil { @@ -363,6 +364,7 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) error { } } + // TODO: move into ValidateNewBlock err = node.verifyIncomingReceipts(newBlock) if err != nil { return ctxerror.New("[VerifyNewBlock] Cannot ValidateNewBlock", "blockHash", newBlock.Hash(), @@ -566,6 +568,7 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) { } // Update last consensus time for metrics + // TODO: randomly selected a few validators to broadcast messages instead of only leader broadcast node.lastConsensusTime = time.Now().Unix() if node.Consensus.PubKey.IsEqual(node.Consensus.LeaderPubKey) { if node.NodeConfig.ShardID == 0 { @@ -580,6 +583,7 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) { Msg("BINGO !!! Reached Consensus") } + // TODO chao: Write New checkpoint after clean node.Blockchain().CleanCXReceiptsCheckpointsByBlock(newBlock) if node.NodeConfig.GetNetworkType() != nodeconfig.Mainnet { diff --git a/node/worker/worker.go b/node/worker/worker.go index 24876ab14..261c6ee91 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -93,7 +93,7 @@ func (w *Worker) SelectTransactionsForNewBlock(newBlockNum uint64, txs types.Tra unselected := types.Transactions{} invalid := types.Transactions{} for _, tx := range txs { - if tx.ShardID() != w.shardID && tx.ToShardID() != w.shardID { + if tx.ShardID() != w.shardID { invalid = append(invalid, tx) continue } @@ -186,7 +186,10 @@ func (w *Worker) CommitReceipts(receiptsList []*types.CXReceiptsProof) error { } for _, cx := range receiptsList { - core.ApplyIncomingReceipt(w.config, w.current.state, w.current.header, cx) + err := core.ApplyIncomingReceipt(w.config, w.current.state, w.current.header, cx) + if err != nil { + return ctxerror.New("cannot apply receiptsList").WithCause(err) + } } for _, cx := range receiptsList {