Merge branch 'master' of github.com:harmony-one/harmony into version_headers

pull/1538/head
Eugene Kim 5 years ago
commit 2b6abc27f0
  1. 7
      api/proto/node/node.go
  2. 148
      api/service/explorer/service.go
  3. 6
      api/service/explorer/storage.go
  4. 40
      api/service/explorer/structs.go
  5. 3
      api/service/explorer/structs_test.go
  6. 5
      api/service/syncing/syncing.go
  7. 2
      consensus/consensus.go
  8. 2
      consensus/consensus_v2.go
  9. 110
      core/block_validator.go
  10. 19
      core/blockchain.go
  11. 64
      core/types/cx_receipt.go
  12. 161
      node/node_cross_shard.go
  13. 387
      node/node_handler.go
  14. 35
      node/node_newblock.go
  15. 304
      node/node_resharding.go
  16. 9
      node/service_setup.go

@ -189,9 +189,10 @@ func DeserializeEpochShardStateFromMessage(payload []byte) (*shard.EpochShardSta
return epochShardState, nil
}
// ConstructCXReceiptsProof constructs cross shard receipts and merkle proof
func ConstructCXReceiptsProof(cxs types.CXReceipts, mkp *types.CXMerkleProof) []byte {
msg := &types.CXReceiptsProof{Receipts: cxs, MerkleProof: mkp}
// ConstructCXReceiptsProof constructs cross shard receipts and related proof including
// merkle proof, blockHeader and commitSignatures
func ConstructCXReceiptsProof(cxs types.CXReceipts, mkp *types.CXMerkleProof, header *block.Header, commitSig []byte, commitBitmap []byte) []byte {
msg := &types.CXReceiptsProof{Receipts: cxs, MerkleProof: mkp, Header: header, CommitSig: commitSig, CommitBitmap: commitBitmap}
byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(Block))

@ -32,6 +32,9 @@ import (
// Constants for explorer service.
const (
explorerPortDifference = 4000
paginationOffset = 10
txViewNone = "NONE"
txViewAll = "ALL"
)
// HTTPError is an HTTP error.
@ -102,7 +105,8 @@ func (s *Service) Run() *http.Server {
s.router = mux.NewRouter()
// Set up router for blocks.
s.router.Path("/blocks").Queries("from", "{[0-9]*?}", "to", "{[0-9]*?}").HandlerFunc(s.GetExplorerBlocks).Methods("GET")
// Blocks are divided into pages in consequent groups of offset size.
s.router.Path("/blocks").Queries("from", "{[0-9]*?}", "to", "{[0-9]*?}", "page", "{[0-9]*?}", "offset", "{[0-9]*?}").HandlerFunc(s.GetExplorerBlocks).Methods("GET")
s.router.Path("/blocks").HandlerFunc(s.GetExplorerBlocks)
// Set up router for tx.
@ -110,7 +114,8 @@ func (s *Service) Run() *http.Server {
s.router.Path("/tx").HandlerFunc(s.GetExplorerTransaction)
// Set up router for address.
s.router.Path("/address").Queries("id", fmt.Sprintf("{([0-9A-Fa-fx]*?)|(t?one1[%s]{38})}", bech32.Charset)).HandlerFunc(s.GetExplorerAddress).Methods("GET")
// Address transactions are divided into pages in consequent groups of offset size.
s.router.Path("/address").Queries("id", fmt.Sprintf("{([0-9A-Fa-fx]*?)|(t?one1[%s]{38})}", bech32.Charset), "tx_view", "{[A-Z]*?}", "page", "{[0-9]*?}", "offset", "{[0-9]*?}").HandlerFunc(s.GetExplorerAddress).Methods("GET")
s.router.Path("/address").HandlerFunc(s.GetExplorerAddress)
// Set up router for node count.
@ -122,8 +127,8 @@ func (s *Service) Run() *http.Server {
s.router.Path("/shard").HandlerFunc(s.GetExplorerShard)
// Set up router for committee.
s.router.Path("/committee").Queries("shard_id", "{[0-9]*?}", "epoch", "{[0-9]*?}").HandlerFunc(s.GetCommittee).Methods("GET")
s.router.Path("/committee").HandlerFunc(s.GetCommittee).Methods("GET")
s.router.Path("/committee").Queries("shard_id", "{[0-9]*?}", "epoch", "{[0-9]*?}").HandlerFunc(s.GetExplorerCommittee).Methods("GET")
s.router.Path("/committee").HandlerFunc(s.GetExplorerCommittee).Methods("GET")
// Do serving now.
utils.Logger().Info().Str("port", GetExplorerPort(s.Port)).Msg("Listening")
@ -165,7 +170,8 @@ func (s *Service) GetExplorerBlocks(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
from := r.FormValue("from")
to := r.FormValue("to")
pageParam := r.FormValue("page")
offsetParam := r.FormValue("offset")
data := &Data{
Blocks: []*Block{},
}
@ -176,12 +182,15 @@ func (s *Service) GetExplorerBlocks(w http.ResponseWriter, r *http.Request) {
}()
if from == "" {
utils.Logger().Warn().Msg("Missing from parameter")
w.WriteHeader(http.StatusBadRequest)
return
}
db := s.storage.GetDB()
fromInt, err := strconv.Atoi(from)
if err != nil {
utils.Logger().Warn().Err(err).Str("from", from).Msg("invalid from parameter")
utils.Logger().Warn().Err(err).Msg("invalid from parameter")
w.WriteHeader(http.StatusBadRequest)
return
}
var toInt int
@ -197,9 +206,32 @@ func (s *Service) GetExplorerBlocks(w http.ResponseWriter, r *http.Request) {
toInt, err = strconv.Atoi(to)
}
if err != nil {
utils.Logger().Warn().Err(err).Str("to", to).Msg("invalid to parameter")
utils.Logger().Warn().Err(err).Msg("invalid to parameter")
w.WriteHeader(http.StatusBadRequest)
return
}
var offset int
if offsetParam != "" {
offset, err = strconv.Atoi(offsetParam)
if err != nil || offset < 1 {
utils.Logger().Warn().Msg("invalid offset parameter")
w.WriteHeader(http.StatusBadRequest)
return
}
} else {
offset = paginationOffset
}
var page int
if pageParam != "" {
page, err = strconv.Atoi(pageParam)
if err != nil {
utils.Logger().Warn().Err(err).Msg("invalid page parameter")
w.WriteHeader(http.StatusBadRequest)
return
}
} else {
page = 0
}
accountBlocks := s.ReadBlocksFromDB(fromInt, toInt)
curEpoch := int64(-1)
@ -283,7 +315,12 @@ func (s *Service) GetExplorerBlocks(w http.ResponseWriter, r *http.Request) {
}
data.Blocks = append(data.Blocks, block)
}
return
paginatedBlocks := make([]*Block, 0)
for i := 0; i < offset && i+offset*page < len(data.Blocks); i++ {
paginatedBlocks = append(paginatedBlocks, data.Blocks[i+offset*page])
}
data.Blocks = paginatedBlocks
}
// GetExplorerTransaction servers /tx end-point.
@ -298,24 +335,28 @@ func (s *Service) GetExplorerTransaction(w http.ResponseWriter, r *http.Request)
}
}()
if id == "" {
utils.Logger().Warn().Msg("invalid id parameter")
w.WriteHeader(http.StatusBadRequest)
return
}
db := s.storage.GetDB()
bytes, err := db.Get([]byte(GetTXKey(id)))
if err != nil {
utils.Logger().Warn().Err(err).Str("id", id).Msg("cannot read TX")
w.WriteHeader(http.StatusInternalServerError)
return
}
tx := new(Transaction)
if rlp.DecodeBytes(bytes, tx) != nil {
utils.Logger().Warn().Str("id", id).Msg("cannot convert data from DB")
w.WriteHeader(http.StatusInternalServerError)
return
}
data.TX = *tx
}
// GetCommittee servers /comittee end-point.
func (s *Service) GetCommittee(w http.ResponseWriter, r *http.Request) {
// GetExplorerCommittee servers /comittee end-point.
func (s *Service) GetExplorerCommittee(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
shardIDRead := r.FormValue("shard_id")
epochRead := r.FormValue("epoch")
@ -326,7 +367,7 @@ func (s *Service) GetCommittee(w http.ResponseWriter, r *http.Request) {
shardID, err = strconv.ParseUint(shardIDRead, 10, 32)
if err != nil {
utils.Logger().Warn().Err(err).Msg("cannot read shard id")
w.WriteHeader(400)
w.WriteHeader(http.StatusBadRequest)
return
}
}
@ -334,13 +375,13 @@ func (s *Service) GetCommittee(w http.ResponseWriter, r *http.Request) {
epoch, err = strconv.ParseUint(epochRead, 10, 64)
if err != nil {
utils.Logger().Warn().Err(err).Msg("cannot read shard epoch")
w.WriteHeader(400)
w.WriteHeader(http.StatusBadRequest)
return
}
}
if s.ShardID != uint32(shardID) {
utils.Logger().Warn().Msg("incorrect shard id")
w.WriteHeader(400)
w.WriteHeader(http.StatusBadRequest)
return
}
// fetch current epoch if epoch is 0
@ -350,7 +391,7 @@ func (s *Service) GetCommittee(w http.ResponseWriter, r *http.Request) {
blockHeight, err := strconv.Atoi(string(bytes))
if err != nil {
utils.Logger().Warn().Err(err).Msg("cannot decode block height from DB")
w.WriteHeader(500)
w.WriteHeader(http.StatusInternalServerError)
return
}
key := GetBlockKey(blockHeight)
@ -358,7 +399,7 @@ func (s *Service) GetCommittee(w http.ResponseWriter, r *http.Request) {
block := new(types.Block)
if rlp.DecodeBytes(data, block) != nil {
utils.Logger().Warn().Err(err).Msg("cannot get block from db")
w.WriteHeader(500)
w.WriteHeader(http.StatusInternalServerError)
return
}
epoch = block.Epoch().Uint64()
@ -366,13 +407,13 @@ func (s *Service) GetCommittee(w http.ResponseWriter, r *http.Request) {
bytes, err := db.Get([]byte(GetCommitteeKey(uint32(shardID), epoch)))
if err != nil {
utils.Logger().Warn().Err(err).Msg("cannot read committee")
w.WriteHeader(500)
w.WriteHeader(http.StatusInternalServerError)
return
}
committee := &shard.Committee{}
if err := rlp.DecodeBytes(bytes, committee); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot decode committee data from DB")
w.WriteHeader(500)
w.WriteHeader(http.StatusInternalServerError)
return
}
validators := &Committee{}
@ -390,7 +431,7 @@ func (s *Service) GetCommittee(w http.ResponseWriter, r *http.Request) {
}
if err := json.NewEncoder(w).Encode(validators); err != nil {
utils.Logger().Warn().Err(err).Msg("cannot JSON-encode committee")
w.WriteHeader(500)
w.WriteHeader(http.StatusInternalServerError)
}
}
@ -399,25 +440,59 @@ func (s *Service) GetExplorerAddress(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
id := r.FormValue("id")
key := GetAddressKey(id)
utils.Logger().Info().Str("address", id).Msg("Querying address")
txViewParam := r.FormValue("tx_view")
pageParam := r.FormValue("page")
offsetParam := r.FormValue("offset")
txView := txViewNone
if txViewParam != "" {
txView = txViewParam
}
utils.Logger().Info().Str("Address", id).Msg("Querying address")
data := &Data{}
defer func() {
if err := json.NewEncoder(w).Encode(data.Address); err != nil {
ctxerror.Warn(utils.WithCallerSkip(utils.GetLogInstance(), 1), err,
"cannot JSON-encode address")
"cannot JSON-encode Address")
}
}()
if id == "" {
utils.Logger().Warn().Msg("missing address id param")
w.WriteHeader(http.StatusBadRequest)
return
}
var err error
var offset int
if offsetParam != "" {
offset, err = strconv.Atoi(offsetParam)
if err != nil || offset < 1 {
utils.Logger().Warn().Msg("invalid offset parameter")
w.WriteHeader(http.StatusBadRequest)
return
}
} else {
offset = paginationOffset
}
var page int
if pageParam != "" {
page, err = strconv.Atoi(pageParam)
if err != nil {
utils.Logger().Warn().Err(err).Msg("invalid page parameter")
w.WriteHeader(http.StatusBadRequest)
return
}
} else {
page = 0
}
data.Address.ID = id
// Try to populate the banace by directly calling get balance.
// Check the balance from blockchain rather than local DB dump
balanceAddr := big.NewInt(0)
if s.GetAccountBalance != nil {
address := common2.ParseAddr(id)
balance, err := s.GetAccountBalance(address)
if err == nil {
balanceAddr = balance
data.Address.Balance = balance
}
}
@ -430,8 +505,37 @@ func (s *Service) GetExplorerAddress(w http.ResponseWriter, r *http.Request) {
}
if err = rlp.DecodeBytes(bytes, &data.Address); err != nil {
utils.Logger().Warn().Str("id", id).Msg("cannot convert data from DB")
w.WriteHeader(http.StatusInternalServerError)
return
}
if balanceAddr.Cmp(big.NewInt(0)) != 0 {
data.Address.Balance = balanceAddr
}
switch txView {
case txViewNone:
data.Address.TXs = nil
case Received:
receivedTXs := make([]*Transaction, 0)
for _, tx := range data.Address.TXs {
if tx.Type == Received {
receivedTXs = append(receivedTXs, tx)
}
}
data.Address.TXs = receivedTXs
case Sent:
sentTXs := make([]*Transaction, 0)
for _, tx := range data.Address.TXs {
if tx.Type == Sent {
sentTXs = append(sentTXs, tx)
}
}
data.Address.TXs = sentTXs
}
paginatedTXs := make([]*Transaction, 0)
for i := 0; i < offset && i+offset*page < len(data.Address.TXs); i++ {
paginatedTXs = append(paginatedTXs, data.Address.TXs[i+offset*page])
}
data.Address.TXs = paginatedTXs
}
// GetExplorerNodeCount serves /nodes end-point.
@ -439,6 +543,7 @@ func (s *Service) GetExplorerNodeCount(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(len(s.GetNodeIDs())); err != nil {
utils.Logger().Warn().Msg("cannot JSON-encode node count")
w.WriteHeader(http.StatusInternalServerError)
}
}
@ -453,6 +558,7 @@ func (s *Service) GetExplorerShard(w http.ResponseWriter, r *http.Request) {
}
if err := json.NewEncoder(w).Encode(Shard{Nodes: nodes}); err != nil {
utils.Logger().Warn().Msg("cannot JSON-encode shard info")
w.WriteHeader(http.StatusInternalServerError)
}
}

@ -156,11 +156,13 @@ func (storage *Storage) UpdateTXStorage(batch ethdb.Batch, explorerTransaction *
// UpdateAddress ...
func (storage *Storage) UpdateAddress(batch ethdb.Batch, explorerTransaction *Transaction, tx *types.Transaction) {
explorerTransaction.Type = Received
storage.UpdateAddressStorage(batch, explorerTransaction.To, explorerTransaction, tx)
explorerTransaction.Type = Sent
storage.UpdateAddressStorage(batch, explorerTransaction.From, explorerTransaction, tx)
}
// UpdateAddressStorage updates specific addr address.
// UpdateAddressStorage updates specific addr Address.
func (storage *Storage) UpdateAddressStorage(batch ethdb.Batch, addr string, explorerTransaction *Transaction, tx *types.Transaction) {
key := GetAddressKey(addr)
@ -183,6 +185,6 @@ func (storage *Storage) UpdateAddressStorage(batch ethdb.Batch, addr string, exp
utils.Logger().Warn().Err(err).Msg("cannot batch address")
}
} else {
utils.Logger().Error().Err(err).Msg("cannot encode address account")
utils.Logger().Error().Err(err).Msg("cannot encode address")
}
}

@ -5,7 +5,10 @@ import (
"math/big"
"strconv"
"github.com/ethereum/go-ethereum/common"
"github.com/harmony-one/harmony/core/types"
common2 "github.com/harmony-one/harmony/internal/common"
"github.com/harmony-one/harmony/internal/utils"
)
@ -13,11 +16,17 @@ import (
* All the code here is work of progress for the sprint.
*/
// Tx types ...
const (
Received = "RECEIVED"
Sent = "SENT"
)
// Data ...
type Data struct {
Blocks []*Block `json:"blocks"`
// Block Block `json:"block"`
Address Address `json:"address"`
Address Address `json:"Address"`
TX Transaction
}
@ -48,6 +57,7 @@ type Transaction struct {
Value *big.Int `json:"value"`
Bytes string `json:"bytes"`
Data string `json:"data"`
Type string `json:"type"`
}
// Block ...
@ -86,21 +96,6 @@ type Shard struct {
// NewBlock ...
func NewBlock(block *types.Block, height int) *Block {
// TODO(ricl): use block.Header().CommitBitmap and GetPubKeyFromMask
signers := []string{}
/*state, err := block.Header().GetShardState()
if err == nil {
for _, committee := range state {
if committee.ShardID == block.ShardID() {
for i, validator := range committee.NodeList {
oneAddress, err := common.AddressToBech32(validator.EcdsaAddress)
if err != nil && block.Header().LastCommitBitmap[i] != 0x0 {
continue
}
signers = append(signers, oneAddress)
}
}
}
}*/
return &Block{
Height: strconv.Itoa(height),
ID: block.Hash().Hex(),
@ -108,28 +103,29 @@ func NewBlock(block *types.Block, height int) *Block {
Timestamp: strconv.Itoa(int(block.Time().Int64() * 1000)),
MerkleRoot: block.Root().Hex(),
Bytes: strconv.Itoa(int(block.Size())),
Signers: signers,
Signers: []string{},
Epoch: block.Epoch().Uint64(),
ExtraData: string(block.Extra()),
}
}
// GetTransaction ...
func GetTransaction(tx *types.Transaction, accountBlock *types.Block) *Transaction {
func GetTransaction(tx *types.Transaction, addressBlock *types.Block) *Transaction {
if tx.To() == nil {
return nil
}
msg, err := tx.AsMessage(types.HomesteadSigner{})
msg, err := tx.AsMessage(types.NewEIP155Signer(tx.ChainID()))
if err != nil {
utils.Logger().Error().Err(err).Msg("Error when parsing tx into message")
}
return &Transaction{
ID: tx.Hash().Hex(),
Timestamp: strconv.Itoa(int(accountBlock.Time().Int64() * 1000)),
From: msg.From().Hex(), // TODO ek – use bech32
To: msg.To().Hex(), // TODO ek – use bech32
Timestamp: strconv.Itoa(int(addressBlock.Time().Int64() * 1000)),
From: common2.MustAddressToBech32(common.HexToAddress(msg.From().Hex())),
To: common2.MustAddressToBech32(common.HexToAddress(msg.To().Hex())),
Value: msg.Value(),
Bytes: strconv.Itoa(int(tx.Size())),
Data: hex.EncodeToString(tx.Data()),
Type: "",
}
}

@ -11,6 +11,7 @@ import (
"github.com/harmony-one/harmony/block/factory"
"github.com/harmony-one/harmony/core/types"
common2 "github.com/harmony-one/harmony/internal/common"
)
// Test for GetBlockInfoKey
@ -24,6 +25,6 @@ func TestGetTransaction(t *testing.T) {
tx := GetTransaction(tx1, block)
assert.Equal(t, tx.ID, tx1.Hash().Hex(), "should be equal tx1.Hash()")
assert.Equal(t, tx.To, tx1.To().Hex(), "should be equal tx1.To()") // TODO ek – use bech32
assert.Equal(t, tx.To, common2.MustAddressToBech32(common.HexToAddress(tx1.To().Hex())), "should be equal tx1.To()")
assert.Equal(t, tx.Bytes, strconv.Itoa(int(tx1.Size())), "should be equal tx1.Size()")
}

@ -538,8 +538,9 @@ func (ss *StateSync) updateBlockAndStatus(block *types.Block, bc *core.BlockChai
utils.Logger().Info().Str("blockHex", bc.CurrentBlock().Hash().Hex()).Msg("[SYNC] Current Block")
// Verify block signatures
// TODO chao: only when block is verified against last commit sigs, we can update the block and status
if block.NumberU64() > 1 {
err := core.VerifyBlockLastCommitSigs(bc, block)
err := core.VerifyBlockLastCommitSigs(bc, block.Header())
if err != nil {
utils.Logger().Error().Err(err).Msgf("[SYNC] failed verifying signatures for new block %d", block.NumberU64())
return false
@ -695,7 +696,7 @@ func (ss *StateSync) getMaxPeerHeight(isBeacon bool) uint64 {
go func() {
defer wg.Done()
//debug
utils.Logger().Debug().Bool("isBeacon", isBeacon).Str("IP", peerConfig.ip).Str("Port", peerConfig.port).Msg("[Sync]getMaxPeerHeight")
// utils.Logger().Debug().Bool("isBeacon", isBeacon).Str("IP", peerConfig.ip).Str("Port", peerConfig.port).Msg("[Sync]getMaxPeerHeight")
response, err := peerConfig.client.GetBlockChainHeight()
if err != nil {
utils.Logger().Warn().Err(err).Str("IP", peerConfig.ip).Str("Port", peerConfig.port).Msg("[Sync]GetBlockChainHeight failed")

@ -127,7 +127,7 @@ type Consensus struct {
ReadySignal chan struct{}
// The post-consensus processing func passed from Node object
// Called when consensus on a new block is done
OnConsensusDone func(*types.Block)
OnConsensusDone func(*types.Block, []byte)
// The verifier func passed from Node object
BlockVerifier func(*types.Block) error

@ -985,7 +985,7 @@ func (consensus *Consensus) tryCatchup() {
consensus.LeaderPubKey = msgs[0].SenderPubkey
consensus.getLogger().Info().Msg("[TryCatchup] Adding block to chain")
consensus.OnConsensusDone(block)
consensus.OnConsensusDone(block, msgs[0].Payload)
consensus.ResetState()
select {

@ -17,13 +17,17 @@
package core
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/harmony-one/bls/ffi/go/bls"
bls2 "github.com/harmony-one/harmony/crypto/bls"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/block"
consensus_engine "github.com/harmony-one/harmony/consensus/engine"
"github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types"
@ -109,19 +113,18 @@ func (v *BlockValidator) ValidateState(block, parent *types.Block, statedb *stat
return nil
}
// VerifyBlockLastCommitSigs verifies the last commit sigs of the block
func VerifyBlockLastCommitSigs(bc *BlockChain, block *types.Block) error {
header := block.Header()
parentBlock := bc.GetBlockByNumber(block.NumberU64() - 1)
if parentBlock == nil {
return ctxerror.New("[VerifyNewBlock] Failed to get parent block", "shardID", header.ShardID(), "blockNum", header.Number())
// VerifyHeaderWithSignature verifies the header with corresponding commit sigs
func VerifyHeaderWithSignature(header *block.Header, commitSig []byte, commitBitmap []byte) error {
if header == nil || len(commitSig) != 96 || len(commitBitmap) == 0 {
return ctxerror.New("[VerifyHeaderWithSignature] Invalid header/commitSig/commitBitmap", "header", header, "commitSigLen", len(commitSig), "commitBitmapLen", len(commitBitmap))
}
parentHeader := parentBlock.Header()
shardState, err := bc.ReadShardState(parentHeader.Epoch())
committee := shardState.FindCommitteeByID(parentHeader.ShardID())
if err != nil || committee == nil {
return ctxerror.New("[VerifyNewBlock] Failed to read shard state for cross link header", "shardID", header.ShardID(), "blockNum", header.Number()).WithCause(err)
shardState := GetShardState(header.Epoch())
committee := shardState.FindCommitteeByID(header.ShardID())
var err error
if committee == nil {
return ctxerror.New("[VerifyHeaderWithSignature] Failed to read shard state", "shardID", header.ShardID(), "blockNum", header.Number())
}
var committerKeys []*bls.PublicKey
@ -136,34 +139,46 @@ func VerifyBlockLastCommitSigs(bc *BlockChain, block *types.Block) error {
committerKeys = append(committerKeys, committerKey)
}
if !parseKeysSuccess {
return ctxerror.New("[VerifyNewBlock] cannot convert BLS public key", "shardID", header.ShardID(), "blockNum", header.Number()).WithCause(err)
return ctxerror.New("[VerifyBlockWithSignature] cannot convert BLS public key", "shardID", header.ShardID(), "blockNum", header.Number()).WithCause(err)
}
mask, err := bls2.NewMask(committerKeys, nil)
if err != nil {
return ctxerror.New("[VerifyNewBlock] cannot create group sig mask", "shardID", header.ShardID(), "blockNum", header.Number()).WithCause(err)
return ctxerror.New("[VerifyHeaderWithSignature] cannot create group sig mask", "shardID", header.ShardID(), "blockNum", header.Number()).WithCause(err)
}
if err := mask.SetMask(header.LastCommitBitmap()); err != nil {
return ctxerror.New("[VerifyNewBlock] cannot set group sig mask bits", "shardID", header.ShardID(), "blockNum", header.Number()).WithCause(err)
if err := mask.SetMask(commitBitmap); err != nil {
return ctxerror.New("[VerifyHeaderWithSignature] cannot set group sig mask bits", "shardID", header.ShardID(), "blockNum", header.Number()).WithCause(err)
}
aggSig := bls.Sign{}
lastCommitSig := header.LastCommitSignature()
err = aggSig.Deserialize(lastCommitSig[:])
err = aggSig.Deserialize(commitSig[:])
if err != nil {
return ctxerror.New("[VerifyNewBlock] unable to deserialize multi-signature from payload").WithCause(err)
}
blockNumBytes := make([]byte, 8)
binary.LittleEndian.PutUint64(blockNumBytes, header.Number().Uint64()-1)
parentHash := header.ParentHash()
commitPayload := append(blockNumBytes, parentHash[:]...)
binary.LittleEndian.PutUint64(blockNumBytes, header.Number().Uint64())
hash := header.Hash()
commitPayload := append(blockNumBytes, hash[:]...)
if !aggSig.VerifyHash(mask.AggregatePublic, commitPayload) {
return ctxerror.New("[VerifyNewBlock] Failed to verify the signature for last commit sig", "shardID", header.ShardID(), "blockNum", header.Number())
return ctxerror.New("[VerifyHeaderWithSignature] Failed to verify the signature for last commit sig", "shardID", header.ShardID(), "blockNum", header.Number())
}
return nil
}
// VerifyBlockLastCommitSigs verifies the last commit sigs of the block
func VerifyBlockLastCommitSigs(bc *BlockChain, header *block.Header) error {
parentBlock := bc.GetBlockByNumber(header.Number().Uint64() - 1)
if parentBlock == nil {
return ctxerror.New("[VerifyBlockLastCommitSigs] Failed to get parent block", "shardID", header.ShardID(), "blockNum", header.Number())
}
parentHeader := parentBlock.Header()
lastCommitSig := header.LastCommitSignature()
lastCommitBitmap := header.LastCommitBitmap()
return VerifyHeaderWithSignature(parentHeader, lastCommitSig[:], lastCommitBitmap)
}
// CalcGasLimit computes the gas limit of the next block after parent. It aims
// to keep the baseline gas above the provided floor, and increase it towards the
// ceil if the blocks are full. If the ceil is exceeded, it will always decrease
@ -200,3 +215,56 @@ func CalcGasLimit(parent *types.Block, gasFloor, gasCeil uint64) uint64 {
}
return limit
}
// IsValidCXReceiptsProof checks whether the given CXReceiptsProof is consistency with itself
func IsValidCXReceiptsProof(cxp *types.CXReceiptsProof) error {
toShardID, err := cxp.GetToShardID()
if err != nil {
return ctxerror.New("[IsValidCXReceiptsProof] invalid shardID").WithCause(err)
}
merkleProof := cxp.MerkleProof
shardRoot := common.Hash{}
foundMatchingShardID := false
byteBuffer := bytes.NewBuffer([]byte{})
// prepare to calculate source shard outgoing cxreceipts root hash
for j := 0; j < len(merkleProof.ShardIDs); j++ {
sKey := make([]byte, 4)
binary.BigEndian.PutUint32(sKey, merkleProof.ShardIDs[j])
byteBuffer.Write(sKey)
byteBuffer.Write(merkleProof.CXShardHashes[j][:])
if merkleProof.ShardIDs[j] == toShardID {
shardRoot = merkleProof.CXShardHashes[j]
foundMatchingShardID = true
}
}
if !foundMatchingShardID {
return ctxerror.New("[IsValidCXReceiptsProof] Didn't find matching shardID")
}
sourceShardID := merkleProof.ShardID
sourceBlockNum := merkleProof.BlockNum
sha := types.DeriveSha(cxp.Receipts)
// (1) verify the CXReceipts trie root match
if sha != shardRoot {
return ctxerror.New("[IsValidCXReceiptsProof] Trie Root of ReadCXReceipts Not Match", "sourceShardID", sourceShardID, "sourceBlockNum", sourceBlockNum, "calculated", sha, "got", shardRoot)
}
// (2) verify the outgoingCXReceiptsHash match
outgoingHashFromSourceShard := crypto.Keccak256Hash(byteBuffer.Bytes())
if outgoingHashFromSourceShard != merkleProof.CXReceiptHash {
return ctxerror.New("[IsValidCXReceiptsProof] IncomingReceiptRootHash from source shard not match", "sourceShardID", sourceShardID, "sourceBlockNum", sourceBlockNum, "calculated", outgoingHashFromSourceShard, "got", merkleProof.CXReceiptHash)
}
// (3) verify the block hash matches
if cxp.Header.Hash() != merkleProof.BlockHash || cxp.Header.OutgoingReceiptHash() != merkleProof.CXReceiptHash {
return ctxerror.New("[IsValidCXReceiptsProof] BlockHash or OutgoingReceiptHash not match in block Header", "blockHash", cxp.Header.Hash(), "merkleProofBlockHash", merkleProof.BlockHash, "headerOutReceiptHash", cxp.Header.OutgoingReceiptHash(), "merkleOutReceiptHash", merkleProof.CXReceiptHash)
}
// (4) verify signatures of blockHeader
return VerifyHeaderWithSignature(cxp.Header, cxp.CommitSig, cxp.CommitBitmap)
}

@ -2166,39 +2166,38 @@ func (bc *BlockChain) NextCXReceiptsCheckpoint(currentNum uint64, shardID uint32
// the new checkpoint will not exceed currentNum+1
for num := lastCheckpoint; num <= currentNum+1; num++ {
newCheckpoint = num
by, _ := rawdb.ReadCXReceiptsProofSpent(bc.db, shardID, num)
if by == rawdb.NAByte {
// TODO chao: check if there is IncompingReceiptsHash in crosslink header
// if the rootHash is non-empty, it means incomingReceipts are not delivered
// otherwise, it means there is no cross-shard transactions for this block
newCheckpoint = num
continue
}
if by == rawdb.SpentByte {
newCheckpoint = num
continue
}
// the first unspent blockHash found, break the loop
newCheckpoint = num
break
}
return newCheckpoint
}
// cleanCXReceiptsCheckpoints will update the checkpoint and clean spent receipts upto checkpoint
func (bc *BlockChain) cleanCXReceiptsCheckpoints(shardID uint32, currentNum uint64) {
// updateCXReceiptsCheckpoints will update the checkpoint and clean spent receipts upto checkpoint
func (bc *BlockChain) updateCXReceiptsCheckpoints(shardID uint32, currentNum uint64) {
lastCheckpoint, err := rawdb.ReadCXReceiptsProofUnspentCheckpoint(bc.db, shardID)
if err != nil {
utils.Logger().Warn().Msg("[cleanCXReceiptsCheckpoints] Cannot get lastCheckpoint")
utils.Logger().Warn().Msg("[updateCXReceiptsCheckpoints] Cannot get lastCheckpoint")
}
newCheckpoint := bc.NextCXReceiptsCheckpoint(currentNum, shardID)
if lastCheckpoint == newCheckpoint {
return
}
utils.Logger().Debug().Uint64("lastCheckpoint", lastCheckpoint).Uint64("newCheckpont", newCheckpoint).Msg("[CleanCXReceiptsCheckpoints]")
utils.Logger().Debug().Uint64("lastCheckpoint", lastCheckpoint).Uint64("newCheckpont", newCheckpoint).Msg("[updateCXReceiptsCheckpoints]")
for num := lastCheckpoint; num < newCheckpoint; num++ {
rawdb.DeleteCXReceiptsProofSpent(bc.db, shardID, num)
}
rawdb.WriteCXReceiptsProofUnspentCheckpoint(bc.db, shardID, newCheckpoint)
}
// WriteCXReceiptsProofSpent mark the CXReceiptsProof list with given unspent status
@ -2220,8 +2219,8 @@ func (bc *BlockChain) IsSpent(cxp *types.CXReceiptsProof) bool {
return false
}
// CleanCXReceiptsCheckpointsByBlock cleans checkpoints based on incomingReceipts of the given block
func (bc *BlockChain) CleanCXReceiptsCheckpointsByBlock(block *types.Block) {
// UpdateCXReceiptsCheckpointsByBlock cleans checkpoints and update latest checkpoint based on incomingReceipts of the given block
func (bc *BlockChain) UpdateCXReceiptsCheckpointsByBlock(block *types.Block) {
m := make(map[uint32]uint64)
for _, cxp := range block.IncomingReceipts() {
shardID := cxp.MerkleProof.ShardID
@ -2235,6 +2234,6 @@ func (bc *BlockChain) CleanCXReceiptsCheckpointsByBlock(block *types.Block) {
for k, v := range m {
utils.Logger().Debug().Uint32("shardID", k).Uint64("blockNum", v).Msg("[CleanCXReceiptsCheckpoints] Cleaning CXReceiptsProof upto")
bc.cleanCXReceiptsCheckpoints(k, v)
bc.updateCXReceiptsCheckpoints(k, v)
}
}

@ -1,14 +1,12 @@
package types
import (
"bytes"
"encoding/binary"
"math/big"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/internal/ctxerror"
)
@ -62,11 +60,6 @@ func (cs CXReceipts) MaxToShardID() uint32 {
return maxShardID
}
// NewCrossShardReceipt creates a cross shard receipt
func NewCrossShardReceipt(txHash common.Hash, from common.Address, to *common.Address, shardID uint32, toShardID uint32, amount *big.Int) *CXReceipt {
return &CXReceipt{TxHash: txHash, From: from, To: to, ShardID: shardID, ToShardID: toShardID, Amount: amount}
}
// CXMerkleProof represents the merkle proof of a collection of ordered cross shard transactions
type CXMerkleProof struct {
BlockNum *big.Int // blockNumber of source shard
@ -79,8 +72,11 @@ type CXMerkleProof struct {
// CXReceiptsProof carrys the cross shard receipts and merkle proof
type CXReceiptsProof struct {
Receipts CXReceipts
MerkleProof *CXMerkleProof
Receipts CXReceipts
MerkleProof *CXMerkleProof
Header *block.Header
CommitSig []byte
CommitBitmap []byte
}
// CXReceiptsProofs is a list of CXReceiptsProof
@ -130,51 +126,3 @@ func (cxp *CXReceiptsProof) GetToShardID() (uint32, error) {
}
return shardID, nil
}
// IsValidCXReceiptsProof checks whether the given CXReceiptsProof is consistency with itself
func (cxp *CXReceiptsProof) IsValidCXReceiptsProof() error {
toShardID, err := cxp.GetToShardID()
if err != nil {
return ctxerror.New("[IsValidCXReceiptsProof] invalid shardID").WithCause(err)
}
merkleProof := cxp.MerkleProof
shardRoot := common.Hash{}
foundMatchingShardID := false
byteBuffer := bytes.NewBuffer([]byte{})
// prepare to calculate source shard outgoing cxreceipts root hash
for j := 0; j < len(merkleProof.ShardIDs); j++ {
sKey := make([]byte, 4)
binary.BigEndian.PutUint32(sKey, merkleProof.ShardIDs[j])
byteBuffer.Write(sKey)
byteBuffer.Write(merkleProof.CXShardHashes[j][:])
if merkleProof.ShardIDs[j] == toShardID {
shardRoot = merkleProof.CXShardHashes[j]
foundMatchingShardID = true
}
}
if !foundMatchingShardID {
return ctxerror.New("[IsValidCXReceiptsProof] Didn't find matching shardID")
}
sourceShardID := merkleProof.ShardID
sourceBlockNum := merkleProof.BlockNum
sourceOutgoingCXReceiptsHash := merkleProof.CXReceiptHash
sha := DeriveSha(cxp.Receipts)
// (1) verify the CXReceipts trie root match
if sha != shardRoot {
return ctxerror.New("[IsValidCXReceiptsProof] Trie Root of ReadCXReceipts Not Match", "sourceShardID", sourceShardID, "sourceBlockNum", sourceBlockNum, "calculated", sha, "got", shardRoot)
}
// (2) verify the outgoingCXReceiptsHash match
outgoingHashFromSourceShard := crypto.Keccak256Hash(byteBuffer.Bytes())
if outgoingHashFromSourceShard != sourceOutgoingCXReceiptsHash {
return ctxerror.New("[IsValidCXReceiptsProof] IncomingReceiptRootHash from source shard not match", "sourceShardID", sourceShardID, "sourceBlockNum", sourceBlockNum, "calculated", outgoingHashFromSourceShard, "got", sourceOutgoingCXReceiptsHash)
}
return nil
}

@ -4,11 +4,15 @@ import (
"encoding/binary"
"errors"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/bls/ffi/go/bls"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
@ -17,6 +21,113 @@ import (
"github.com/harmony-one/harmony/internal/utils"
)
// BroadcastCXReceipts broadcasts cross shard receipts to correspoding
// destination shards
func (node *Node) BroadcastCXReceipts(newBlock *types.Block, lastCommits []byte) {
//#### Read payload data from committed msg
if len(lastCommits) <= 96 {
utils.Logger().Debug().Int("lastCommitsLen", len(lastCommits)).Msg("[BroadcastCXReceipts] lastCommits Not Enough Length")
}
commitSig := make([]byte, 96)
commitBitmap := make([]byte, len(lastCommits)-96)
offset := 0
copy(commitSig[:], lastCommits[offset:offset+96])
offset += 96
copy(commitBitmap[:], lastCommits[offset:])
//#### END Read payload data from committed msg
epoch := newBlock.Header().Epoch()
shardingConfig := core.ShardingSchedule.InstanceForEpoch(epoch)
shardNum := int(shardingConfig.NumShards())
myShardID := node.Consensus.ShardID
utils.Logger().Info().Int("shardNum", shardNum).Uint32("myShardID", myShardID).Uint64("blockNum", newBlock.NumberU64()).Msg("[BroadcastCXReceipts]")
for i := 0; i < shardNum; i++ {
if i == int(myShardID) {
continue
}
cxReceipts, err := node.Blockchain().ReadCXReceipts(uint32(i), newBlock.NumberU64(), newBlock.Hash(), false)
if err != nil || len(cxReceipts) == 0 {
utils.Logger().Warn().Err(err).Uint32("ToShardID", uint32(i)).Int("numCXReceipts", len(cxReceipts)).Msg("[BroadcastCXReceipts] No ReadCXReceipts found")
continue
}
merkleProof, err := node.Blockchain().CXMerkleProof(uint32(i), newBlock)
if err != nil {
utils.Logger().Warn().Uint32("ToShardID", uint32(i)).Msg("[BroadcastCXReceipts] Unable to get merkleProof")
continue
}
utils.Logger().Info().Uint32("ToShardID", uint32(i)).Msg("[BroadcastCXReceipts] ReadCXReceipts and MerkleProof Found")
groupID := p2p.ShardID(i)
go node.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof, newBlock.Header(), commitSig, commitBitmap)))
}
}
// VerifyBlockCrossLinks verifies the cross links of the block
func (node *Node) VerifyBlockCrossLinks(block *types.Block) error {
if len(block.Header().CrossLinks()) == 0 {
return nil
}
crossLinks := &types.CrossLinks{}
err := rlp.DecodeBytes(block.Header().CrossLinks(), crossLinks)
if err != nil {
return ctxerror.New("[CrossLinkVerification] failed to decode cross links",
"blockHash", block.Hash(),
"crossLinks", len(block.Header().CrossLinks()),
).WithCause(err)
}
if !crossLinks.IsSorted() {
return ctxerror.New("[CrossLinkVerification] cross links are not sorted",
"blockHash", block.Hash(),
"crossLinks", len(block.Header().CrossLinks()),
)
}
firstCrossLinkBlock := core.EpochFirstBlock(node.Blockchain().Config().CrossLinkEpoch)
for i, crossLink := range *crossLinks {
lastLink := &types.CrossLink{}
if i == 0 {
if crossLink.BlockNum().Cmp(firstCrossLinkBlock) > 0 {
lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID())
if err != nil {
return ctxerror.New("[CrossLinkVerification] no last cross link found 1",
"blockHash", block.Hash(),
"crossLink", lastLink,
).WithCause(err)
}
}
} else {
if (*crossLinks)[i-1].Header().ShardID() != crossLink.Header().ShardID() {
if crossLink.BlockNum().Cmp(firstCrossLinkBlock) > 0 {
lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID())
if err != nil {
return ctxerror.New("[CrossLinkVerification] no last cross link found 2",
"blockHash", block.Hash(),
"crossLink", lastLink,
).WithCause(err)
}
}
} else {
lastLink = &(*crossLinks)[i-1]
}
}
if crossLink.BlockNum().Cmp(firstCrossLinkBlock) > 0 { // TODO: verify genesis block
err = node.VerifyCrosslinkHeader(lastLink.Header(), crossLink.Header())
if err != nil {
return ctxerror.New("cannot ValidateNewBlock",
"blockHash", block.Hash(),
"numTx", len(block.Transactions()),
).WithCause(err)
}
}
}
return nil
}
// ProcessHeaderMessage verify and process Node/Header message into crosslink when it's valid
func (node *Node) ProcessHeaderMessage(msgPayload []byte) {
if node.NodeConfig.ShardID == 0 {
@ -99,21 +210,25 @@ func (node *Node) verifyIncomingReceipts(block *types.Block) error {
m := make(map[common.Hash]bool)
cxps := block.IncomingReceipts()
for _, cxp := range cxps {
if err := cxp.IsValidCXReceiptsProof(); err != nil {
return ctxerror.New("[verifyIncomingReceipts] verification failed").WithCause(err)
}
// double spent
if node.Blockchain().IsSpent(cxp) {
return ctxerror.New("[verifyIncomingReceipts] Double Spent!")
}
hash := cxp.MerkleProof.BlockHash
// ignore duplicated receipts
// duplicated receipts
if _, ok := m[hash]; ok {
return ctxerror.New("[verifyIncomingReceipts] Double Spent!")
}
m[hash] = true
if err := node.compareCrosslinkWithReceipts(cxp); err != nil {
return err
for _, item := range cxp.Receipts {
if item.ToShardID != node.Blockchain().ShardID() {
return ctxerror.New("[verifyIncomingReceipts] Invalid ToShardID", "myShardID", node.Blockchain().ShardID(), "expectShardID", item.ToShardID)
}
}
if err := core.IsValidCXReceiptsProof(cxp); err != nil {
return ctxerror.New("[verifyIncomingReceipts] verification failed").WithCause(err)
}
}
@ -128,34 +243,6 @@ func (node *Node) verifyIncomingReceipts(block *types.Block) error {
return nil
}
func (node *Node) compareCrosslinkWithReceipts(cxp *types.CXReceiptsProof) error {
var hash, outgoingReceiptHash common.Hash
shardID := cxp.MerkleProof.ShardID
blockNum := cxp.MerkleProof.BlockNum.Uint64()
beaconChain := node.Beaconchain()
if shardID == 0 {
block := beaconChain.GetBlockByNumber(blockNum)
if block == nil {
return ctxerror.New("[compareCrosslinkWithReceipts] Cannot get beaconchain header", "blockNum", blockNum, "shardID", shardID)
}
hash = block.Hash()
outgoingReceiptHash = block.OutgoingReceiptHash()
} else {
crossLink, err := beaconChain.ReadCrossLink(shardID, blockNum, false)
if err != nil {
return ctxerror.New("[compareCrosslinkWithReceipts] Cannot get crosslink", "blockNum", blockNum, "shardID", shardID).WithCause(err)
}
hash = crossLink.ChainHeader.Hash()
outgoingReceiptHash = crossLink.ChainHeader.OutgoingReceiptHash()
}
// verify the source block hash is from a finalized block
if hash == cxp.MerkleProof.BlockHash && outgoingReceiptHash == cxp.MerkleProof.CXReceiptHash {
return nil
}
return ErrCrosslinkVerificationFail
}
// VerifyCrosslinkHeader verifies the header is valid against the prevHeader.
func (node *Node) VerifyCrosslinkHeader(prevHeader, header *block.Header) error {
@ -284,12 +371,6 @@ func (node *Node) ProcessReceiptMessage(msgPayload []byte) {
utils.Logger().Error().Err(err).Msg("[ProcessReceiptMessage] Unable to Decode message Payload")
return
}
if err := cxp.IsValidCXReceiptsProof(); err != nil {
utils.Logger().Error().Err(err).Msg("[ProcessReceiptMessage] Invalid CXReceiptsProof")
return
}
utils.Logger().Debug().Interface("cxp", cxp).Msg("[ProcessReceiptMessage] Add CXReceiptsProof to pending Receipts")
// TODO: integrate with txpool
node.AddPendingReceipts(&cxp)

@ -3,15 +3,9 @@ package node
import (
"bytes"
"context"
"errors"
"math"
"math/big"
"os"
"os/exec"
"strconv"
"sync"
"sync/atomic"
"syscall"
"time"
"github.com/ethereum/go-ethereum/common"
@ -26,7 +20,6 @@ import (
"github.com/harmony-one/harmony/api/proto/message"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/block"
"github.com/harmony-one/harmony/contracts/structs"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
@ -302,43 +295,13 @@ func (node *Node) BroadcastCrossLinkHeader(newBlock *types.Block) {
node.host.SendMessageToGroups([]p2p.GroupID{node.NodeConfig.GetBeaconGroupID()}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCrossLinkHeadersMessage(headers)))
}
// BroadcastCXReceipts broadcasts cross shard receipts to correspoding
// destination shards
func (node *Node) BroadcastCXReceipts(newBlock *types.Block) {
epoch := newBlock.Header().Epoch()
shardingConfig := core.ShardingSchedule.InstanceForEpoch(epoch)
shardNum := int(shardingConfig.NumShards())
myShardID := node.Consensus.ShardID
utils.Logger().Info().Int("shardNum", shardNum).Uint32("myShardID", myShardID).Uint64("blockNum", newBlock.NumberU64()).Msg("[BroadcastCXReceipts]")
for i := 0; i < shardNum; i++ {
if i == int(myShardID) {
continue
}
cxReceipts, err := node.Blockchain().ReadCXReceipts(uint32(i), newBlock.NumberU64(), newBlock.Hash(), false)
if err != nil || len(cxReceipts) == 0 {
utils.Logger().Warn().Err(err).Uint32("ToShardID", uint32(i)).Int("numCXReceipts", len(cxReceipts)).Msg("[BroadcastCXReceipts] No ReadCXReceipts found")
continue
}
merkleProof, err := node.Blockchain().CXMerkleProof(uint32(i), newBlock)
if err != nil {
utils.Logger().Warn().Uint32("ToShardID", uint32(i)).Msg("[BroadcastCXReceipts] Unable to get merkleProof")
continue
}
utils.Logger().Info().Uint32("ToShardID", uint32(i)).Msg("[BroadcastCXReceipts] ReadCXReceipts and MerkleProof Found")
groupID := p2p.ShardID(i)
go node.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(groupID)}, host.ConstructP2pMessage(byte(0), proto_node.ConstructCXReceiptsProof(cxReceipts, merkleProof)))
}
}
// VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on
func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
// TODO ek – where do we verify parent-child invariants,
// e.g. "child.Number == child.IsGenesis() ? 0 : parent.Number+1"?
if newBlock.NumberU64() > 1 {
err := core.VerifyBlockLastCommitSigs(node.Blockchain(), newBlock)
err := core.VerifyBlockLastCommitSigs(node.Blockchain(), newBlock.Header())
if err != nil {
return err
}
@ -384,182 +347,16 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) error {
return nil
}
// VerifyBlockCrossLinks verifies the cross links of the block
func (node *Node) VerifyBlockCrossLinks(block *types.Block) error {
if len(block.Header().CrossLinks()) == 0 {
return nil
}
crossLinks := &types.CrossLinks{}
err := rlp.DecodeBytes(block.Header().CrossLinks(), crossLinks)
if err != nil {
return ctxerror.New("[CrossLinkVerification] failed to decode cross links",
"blockHash", block.Hash(),
"crossLinks", len(block.Header().CrossLinks()),
).WithCause(err)
}
if !crossLinks.IsSorted() {
return ctxerror.New("[CrossLinkVerification] cross links are not sorted",
"blockHash", block.Hash(),
"crossLinks", len(block.Header().CrossLinks()),
)
}
firstCrossLinkBlock := core.EpochFirstBlock(node.Blockchain().Config().CrossLinkEpoch)
for i, crossLink := range *crossLinks {
lastLink := &types.CrossLink{}
if i == 0 {
if crossLink.BlockNum().Cmp(firstCrossLinkBlock) > 0 {
lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID())
if err != nil {
return ctxerror.New("[CrossLinkVerification] no last cross link found 1",
"blockHash", block.Hash(),
"crossLink", lastLink,
).WithCause(err)
}
}
} else {
if (*crossLinks)[i-1].Header().ShardID() != crossLink.Header().ShardID() {
if crossLink.BlockNum().Cmp(firstCrossLinkBlock) > 0 {
lastLink, err = node.Blockchain().ReadShardLastCrossLink(crossLink.ShardID())
if err != nil {
return ctxerror.New("[CrossLinkVerification] no last cross link found 2",
"blockHash", block.Hash(),
"crossLink", lastLink,
).WithCause(err)
}
}
} else {
lastLink = &(*crossLinks)[i-1]
}
}
if crossLink.BlockNum().Cmp(firstCrossLinkBlock) > 0 { // TODO: verify genesis block
err = node.VerifyCrosslinkHeader(lastLink.Header(), crossLink.Header())
if err != nil {
return ctxerror.New("cannot ValidateNewBlock",
"blockHash", block.Hash(),
"numTx", len(block.Transactions()),
).WithCause(err)
}
}
}
return nil
}
// BigMaxUint64 is maximum possible uint64 value, that is, (1**64)-1.
var BigMaxUint64 = new(big.Int).SetBytes([]byte{
255, 255, 255, 255, 255, 255, 255, 255,
})
// validateNewShardState validate whether the new shard state root matches
func (node *Node) validateNewShardState(block *types.Block, stakeInfo *map[common.Address]*structs.StakeInfo) error {
// Common case first – blocks without resharding proposal
header := block.Header()
if header.ShardStateHash() == (common.Hash{}) {
// No new shard state was proposed
if block.ShardID() == 0 {
if core.IsEpochLastBlock(block) {
// TODO ek - invoke view change
return errors.New("beacon leader did not propose resharding")
}
} else {
if node.nextShardState.master != nil &&
!time.Now().Before(node.nextShardState.proposeTime) {
// TODO ek – invoke view change
return errors.New("regular leader did not propose resharding")
}
}
// We aren't expecting to reshard, so proceed to sign
return nil
}
shardState := &shard.State{}
err := rlp.DecodeBytes(header.ShardState(), shardState)
if err != nil {
return err
}
proposed := *shardState
if block.ShardID() == 0 {
// Beacon validators independently recalculate the master state and
// compare it against the proposed copy.
nextEpoch := new(big.Int).Add(block.Header().Epoch(), common.Big1)
// TODO ek – this may be called from regular shards,
// for vetting beacon chain blocks received during block syncing.
// DRand may or or may not get in the way. Test this out.
expected, err := core.CalculateNewShardState(
node.Blockchain(), nextEpoch, stakeInfo)
if err != nil {
return ctxerror.New("cannot calculate expected shard state").
WithCause(err)
}
if shard.CompareShardState(expected, proposed) != 0 {
// TODO ek – log state proposal differences
// TODO ek – this error should trigger view change
err := errors.New("shard state proposal is different from expected")
// TODO ek/chao – calculated shard state is different even with the
// same input, i.e. it is nondeterministic.
// Don't treat this as a blocker until we fix the nondeterminism.
//return err
ctxerror.Log15(utils.GetLogger().Warn, err)
}
} else {
// Regular validators fetch the local-shard copy on the beacon chain
// and compare it against the proposed copy.
//
// We trust the master proposal in our copy of beacon chain.
// The sanity check for the master proposal is done earlier,
// when the beacon block containing the master proposal is received
// and before it is admitted into the local beacon chain.
//
// TODO ek – fetch masterProposal from beaconchain instead
masterProposal := node.nextShardState.master.ShardState
expected := masterProposal.FindCommitteeByID(block.ShardID())
switch len(proposed) {
case 0:
// Proposal to discontinue shard
if expected != nil {
// TODO ek – invoke view change
return errors.New(
"leader proposed to disband against beacon decision")
}
case 1:
// Proposal to continue shard
proposed := proposed[0]
// Sanity check: Shard ID should match
if proposed.ShardID != block.ShardID() {
// TODO ek – invoke view change
return ctxerror.New("proposal has incorrect shard ID",
"proposedShard", proposed.ShardID,
"blockShard", block.ShardID())
}
// Did beaconchain say we are no more?
if expected == nil {
// TODO ek – invoke view change
return errors.New(
"leader proposed to continue against beacon decision")
}
// Did beaconchain say the same proposal?
if shard.CompareCommittee(expected, &proposed) != 0 {
// TODO ek – log differences
// TODO ek – invoke view change
return errors.New("proposal differs from one in beacon chain")
}
default:
// TODO ek – invoke view change
return ctxerror.New(
"regular resharding proposal has incorrect number of shards",
"numShards", len(proposed))
}
}
return nil
}
// PostConsensusProcessing is called by consensus participants, after consensus is done, to:
// 1. add the new block to blockchain
// 2. [leader] send new block to the client
// 3. [leader] send cross shard tx receipts to destination shard
func (node *Node) PostConsensusProcessing(newBlock *types.Block) {
func (node *Node) PostConsensusProcessing(newBlock *types.Block, commitSigAndBitmap []byte) {
if err := node.AddNewBlock(newBlock); err != nil {
utils.Logger().Error().
Err(err).
@ -578,15 +375,15 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) {
} else {
node.BroadcastCrossLinkHeader(newBlock)
}
node.BroadcastCXReceipts(newBlock)
node.BroadcastCXReceipts(newBlock, commitSigAndBitmap)
} else {
utils.Logger().Info().
Uint64("ViewID", node.Consensus.GetViewID()).
Msg("BINGO !!! Reached Consensus")
}
// TODO chao: Write New checkpoint after clean
node.Blockchain().CleanCXReceiptsCheckpointsByBlock(newBlock)
// TODO chao: uncomment this after beacon syncing is stable
// node.Blockchain().UpdateCXReceiptsCheckpointsByBlock(newBlock)
if node.NodeConfig.GetNetworkType() != nodeconfig.Mainnet {
// Update contract deployer's nonce so default contract like faucet can issue transaction with current nonce
@ -643,22 +440,6 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) {
}
}
func (node *Node) broadcastEpochShardState(newBlock *types.Block) error {
shardState, err := newBlock.Header().GetShardState()
if err != nil {
return err
}
epochShardStateMessage := proto_node.ConstructEpochShardStateMessage(
shard.EpochShardState{
Epoch: newBlock.Header().Epoch().Uint64() + 1,
ShardState: shardState,
},
)
return node.host.SendMessageToGroups(
[]p2p.GroupID{node.NodeConfig.GetClientGroupID()},
host.ConstructP2pMessage(byte(0), epochShardStateMessage))
}
// AddNewBlock is usedd to add new block into the blockchain.
func (node *Node) AddNewBlock(newBlock *types.Block) error {
_, err := node.Blockchain().InsertChain([]*types.Block{newBlock})
@ -804,164 +585,6 @@ func (node *Node) bootstrapConsensus() {
}
}
func (node *Node) epochShardStateMessageHandler(msgPayload []byte) error {
epochShardState, err := proto_node.DeserializeEpochShardStateFromMessage(msgPayload)
if err != nil {
return ctxerror.New("Can't get shard state message").WithCause(err)
}
if node.Consensus == nil {
return nil
}
receivedEpoch := big.NewInt(int64(epochShardState.Epoch))
utils.Logger().Info().
Int64("epoch", receivedEpoch.Int64()).
Msg("received new shard state")
node.nextShardState.master = epochShardState
if node.Consensus.IsLeader() {
// Wait a bit to allow the master table to reach other validators.
node.nextShardState.proposeTime = time.Now().Add(5 * time.Second)
} else {
// Wait a bit to allow the master table to reach the leader,
// and to allow the leader to propose next shard state based upon it.
node.nextShardState.proposeTime = time.Now().Add(15 * time.Second)
}
// TODO ek – this should be done from replaying beaconchain once
// beaconchain sync is fixed
err = node.Beaconchain().WriteShardState(
receivedEpoch, epochShardState.ShardState)
if err != nil {
return ctxerror.New("cannot store shard state", "epoch", receivedEpoch).
WithCause(err)
}
return nil
}
/*
func (node *Node) transitionIntoNextEpoch(shardState types.State) {
logger = logger.New(
"blsPubKey", hex.EncodeToString(node.Consensus.PubKey.Serialize()),
"curShard", node.Blockchain().ShardID(),
"curLeader", node.Consensus.IsLeader())
for _, c := range shardState {
utils.Logger().Debug().
Uint32("shardID", c.ShardID).
Str("nodeList", c.NodeList).
Msg("new shard information")
}
myShardID, isNextLeader := findRoleInShardState(
node.Consensus.PubKey, shardState)
logger = logger.New(
"nextShard", myShardID,
"nextLeader", isNextLeader)
if myShardID == math.MaxUint32 {
getLogger().Info("Somehow I got kicked out. Exiting")
os.Exit(8) // 8 represents it's a loop and the program restart itself
}
myShardState := shardState[myShardID]
// Update public keys
var publicKeys []*bls.PublicKey
for idx, nodeID := range myShardState.NodeList {
key := &bls.PublicKey{}
err := key.Deserialize(nodeID.BlsPublicKey[:])
if err != nil {
getLogger().Error("Failed to deserialize BLS public key in shard state",
"idx", idx,
"error", err)
}
publicKeys = append(publicKeys, key)
}
node.Consensus.UpdatePublicKeys(publicKeys)
// node.DRand.UpdatePublicKeys(publicKeys)
if node.Blockchain().ShardID() == myShardID {
getLogger().Info("staying in the same shard")
} else {
getLogger().Info("moving to another shard")
if err := node.shardChains.Close(); err != nil {
getLogger().Error("cannot close shard chains", "error", err)
}
restartProcess(getRestartArguments(myShardID))
}
}
*/
func findRoleInShardState(
key *bls.PublicKey, state shard.State,
) (shardID uint32, isLeader bool) {
keyBytes := key.Serialize()
for idx, shard := range state {
for nodeIdx, nodeID := range shard.NodeList {
if bytes.Compare(nodeID.BlsPublicKey[:], keyBytes) == 0 {
return uint32(idx), nodeIdx == 0
}
}
}
return math.MaxUint32, false
}
func restartProcess(args []string) {
execFile, err := getBinaryPath()
if err != nil {
utils.Logger().Error().
Err(err).
Str("file", execFile).
Msg("Failed to get program path when restarting program")
}
utils.Logger().Info().
Strs("args", args).
Strs("env", os.Environ()).
Msg("Restarting program")
err = syscall.Exec(execFile, args, os.Environ())
if err != nil {
utils.Logger().Error().
Err(err).
Msg("Failed to restart program after resharding")
}
panic("syscall.Exec() is not supposed to return")
}
func getRestartArguments(myShardID uint32) []string {
args := os.Args
hasShardID := false
shardIDFlag := "-shard_id"
// newNodeFlag := "-is_newnode"
for i, arg := range args {
if arg == shardIDFlag {
if i+1 < len(args) {
args[i+1] = strconv.Itoa(int(myShardID))
} else {
args = append(args, strconv.Itoa(int(myShardID)))
}
hasShardID = true
}
// TODO: enable this
//if arg == newNodeFlag {
// args[i] = "" // remove new node flag
//}
}
if !hasShardID {
args = append(args, shardIDFlag)
args = append(args, strconv.Itoa(int(myShardID)))
}
return args
}
// Gets the path of this currently running binary program.
func getBinaryPath() (argv0 string, err error) {
argv0, err = exec.LookPath(os.Args[0])
if nil != err {
return
}
if _, err = os.Stat(argv0); nil != err {
return
}
return
}
// ConsensusMessageHandler passes received message in node_handler to consensus
func (node *Node) ConsensusMessageHandler(msgPayload []byte) {
node.Consensus.MsgChan <- msgPayload

@ -14,9 +14,10 @@ import (
"github.com/harmony-one/harmony/shard"
)
// Constants of lower bound limit of a new block.
// Constants of proposing a new block
const (
PeriodicBlock = 200 * time.Millisecond
PeriodicBlock = 200 * time.Millisecond
IncomingReceiptsLimit = 6000 // 2000 * (numShards - 1)
)
// WaitForConsensusReadyV2 listen for the readiness signal from consensus and generate new block for consensus.
@ -194,8 +195,10 @@ func (node *Node) proposeLocalShardState(block *types.Block) {
}
func (node *Node) proposeReceiptsProof() []*types.CXReceiptsProof {
numProposed := 0
validReceiptsList := []*types.CXReceiptsProof{}
pendingReceiptsList := []*types.CXReceiptsProof{}
node.pendingCXMutex.Lock()
sort.Slice(node.pendingCXReceipts, func(i, j int) bool {
@ -204,7 +207,12 @@ func (node *Node) proposeReceiptsProof() []*types.CXReceiptsProof {
m := make(map[common.Hash]bool)
Loop:
for _, cxp := range node.pendingCXReceipts {
if numProposed > IncomingReceiptsLimit {
pendingReceiptsList = append(pendingReceiptsList, cxp)
continue
}
// check double spent
if node.Blockchain().IsSpent(cxp) {
utils.Logger().Debug().Interface("cxp", cxp).Msg("[proposeReceiptsProof] CXReceipt is spent")
@ -218,18 +226,25 @@ func (node *Node) proposeReceiptsProof() []*types.CXReceiptsProof {
m[hash] = true
}
if err := node.compareCrosslinkWithReceipts(cxp); err != nil {
utils.Logger().Debug().Err(err).Interface("cxp", cxp).Msg("[proposeReceiptsProof] CrossLink Verify Fail")
if err != ErrCrosslinkVerificationFail {
pendingReceiptsList = append(pendingReceiptsList, cxp)
for _, item := range cxp.Receipts {
if item.ToShardID != node.Blockchain().ShardID() {
continue Loop
}
} else {
utils.Logger().Debug().Interface("cxp", cxp).Msg("[proposeReceiptsProof] CXReceipts Added")
validReceiptsList = append(validReceiptsList, cxp)
}
if err := core.IsValidCXReceiptsProof(cxp); err != nil {
utils.Logger().Error().Err(err).Msg("[proposeReceiptsProof] Invalid CXReceiptsProof")
continue
}
utils.Logger().Debug().Interface("cxp", cxp).Msg("[proposeReceiptsProof] CXReceipts Added")
validReceiptsList = append(validReceiptsList, cxp)
numProposed = numProposed + len(cxp.Receipts)
}
node.pendingCXReceipts = pendingReceiptsList
node.pendingCXMutex.Unlock()
utils.Logger().Debug().Msgf("[proposeReceiptsProof] number of validReceipts %d, pendingReceipts %d", len(validReceiptsList), len(pendingReceiptsList))
utils.Logger().Debug().Msgf("[proposeReceiptsProof] number of validReceipts %d", len(validReceiptsList))
return validReceiptsList
}

@ -0,0 +1,304 @@
package node
import (
"bytes"
"errors"
"math"
"math/big"
"os"
"os/exec"
"strconv"
"syscall"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/contracts/structs"
"time"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/shard"
)
// validateNewShardState validate whether the new shard state root matches
func (node *Node) validateNewShardState(block *types.Block, stakeInfo *map[common.Address]*structs.StakeInfo) error {
// Common case first – blocks without resharding proposal
header := block.Header()
if header.ShardStateHash() == (common.Hash{}) {
// No new shard state was proposed
if block.ShardID() == 0 {
if core.IsEpochLastBlock(block) {
// TODO ek - invoke view change
return errors.New("beacon leader did not propose resharding")
}
} else {
if node.nextShardState.master != nil &&
!time.Now().Before(node.nextShardState.proposeTime) {
// TODO ek – invoke view change
return errors.New("regular leader did not propose resharding")
}
}
// We aren't expecting to reshard, so proceed to sign
return nil
}
shardState := &shard.State{}
err := rlp.DecodeBytes(header.ShardState(), shardState)
if err != nil {
return err
}
proposed := *shardState
if block.ShardID() == 0 {
// Beacon validators independently recalculate the master state and
// compare it against the proposed copy.
nextEpoch := new(big.Int).Add(block.Header().Epoch(), common.Big1)
// TODO ek – this may be called from regular shards,
// for vetting beacon chain blocks received during block syncing.
// DRand may or or may not get in the way. Test this out.
expected, err := core.CalculateNewShardState(
node.Blockchain(), nextEpoch, stakeInfo)
if err != nil {
return ctxerror.New("cannot calculate expected shard state").
WithCause(err)
}
if shard.CompareShardState(expected, proposed) != 0 {
// TODO ek – log state proposal differences
// TODO ek – this error should trigger view change
err := errors.New("shard state proposal is different from expected")
// TODO ek/chao – calculated shard state is different even with the
// same input, i.e. it is nondeterministic.
// Don't treat this as a blocker until we fix the nondeterminism.
//return err
ctxerror.Log15(utils.GetLogger().Warn, err)
}
} else {
// Regular validators fetch the local-shard copy on the beacon chain
// and compare it against the proposed copy.
//
// We trust the master proposal in our copy of beacon chain.
// The sanity check for the master proposal is done earlier,
// when the beacon block containing the master proposal is received
// and before it is admitted into the local beacon chain.
//
// TODO ek – fetch masterProposal from beaconchain instead
masterProposal := node.nextShardState.master.ShardState
expected := masterProposal.FindCommitteeByID(block.ShardID())
switch len(proposed) {
case 0:
// Proposal to discontinue shard
if expected != nil {
// TODO ek – invoke view change
return errors.New(
"leader proposed to disband against beacon decision")
}
case 1:
// Proposal to continue shard
proposed := proposed[0]
// Sanity check: Shard ID should match
if proposed.ShardID != block.ShardID() {
// TODO ek – invoke view change
return ctxerror.New("proposal has incorrect shard ID",
"proposedShard", proposed.ShardID,
"blockShard", block.ShardID())
}
// Did beaconchain say we are no more?
if expected == nil {
// TODO ek – invoke view change
return errors.New(
"leader proposed to continue against beacon decision")
}
// Did beaconchain say the same proposal?
if shard.CompareCommittee(expected, &proposed) != 0 {
// TODO ek – log differences
// TODO ek – invoke view change
return errors.New("proposal differs from one in beacon chain")
}
default:
// TODO ek – invoke view change
return ctxerror.New(
"regular resharding proposal has incorrect number of shards",
"numShards", len(proposed))
}
}
return nil
}
func (node *Node) broadcastEpochShardState(newBlock *types.Block) error {
shardState, err := newBlock.Header().GetShardState()
if err != nil {
return err
}
epochShardStateMessage := proto_node.ConstructEpochShardStateMessage(
shard.EpochShardState{
Epoch: newBlock.Header().Epoch().Uint64() + 1,
ShardState: shardState,
},
)
return node.host.SendMessageToGroups(
[]p2p.GroupID{node.NodeConfig.GetClientGroupID()},
host.ConstructP2pMessage(byte(0), epochShardStateMessage))
}
func (node *Node) epochShardStateMessageHandler(msgPayload []byte) error {
epochShardState, err := proto_node.DeserializeEpochShardStateFromMessage(msgPayload)
if err != nil {
return ctxerror.New("Can't get shard state message").WithCause(err)
}
if node.Consensus == nil {
return nil
}
receivedEpoch := big.NewInt(int64(epochShardState.Epoch))
utils.Logger().Info().
Int64("epoch", receivedEpoch.Int64()).
Msg("received new shard state")
node.nextShardState.master = epochShardState
if node.Consensus.IsLeader() {
// Wait a bit to allow the master table to reach other validators.
node.nextShardState.proposeTime = time.Now().Add(5 * time.Second)
} else {
// Wait a bit to allow the master table to reach the leader,
// and to allow the leader to propose next shard state based upon it.
node.nextShardState.proposeTime = time.Now().Add(15 * time.Second)
}
// TODO ek – this should be done from replaying beaconchain once
// beaconchain sync is fixed
err = node.Beaconchain().WriteShardState(
receivedEpoch, epochShardState.ShardState)
if err != nil {
return ctxerror.New("cannot store shard state", "epoch", receivedEpoch).
WithCause(err)
}
return nil
}
/*
func (node *Node) transitionIntoNextEpoch(shardState types.State) {
logger = logger.New(
"blsPubKey", hex.EncodeToString(node.Consensus.PubKey.Serialize()),
"curShard", node.Blockchain().ShardID(),
"curLeader", node.Consensus.IsLeader())
for _, c := range shardState {
utils.Logger().Debug().
Uint32("shardID", c.ShardID).
Str("nodeList", c.NodeList).
Msg("new shard information")
}
myShardID, isNextLeader := findRoleInShardState(
node.Consensus.PubKey, shardState)
logger = logger.New(
"nextShard", myShardID,
"nextLeader", isNextLeader)
if myShardID == math.MaxUint32 {
getLogger().Info("Somehow I got kicked out. Exiting")
os.Exit(8) // 8 represents it's a loop and the program restart itself
}
myShardState := shardState[myShardID]
// Update public keys
var publicKeys []*bls.PublicKey
for idx, nodeID := range myShardState.NodeList {
key := &bls.PublicKey{}
err := key.Deserialize(nodeID.BlsPublicKey[:])
if err != nil {
getLogger().Error("Failed to deserialize BLS public key in shard state",
"idx", idx,
"error", err)
}
publicKeys = append(publicKeys, key)
}
node.Consensus.UpdatePublicKeys(publicKeys)
// node.DRand.UpdatePublicKeys(publicKeys)
if node.Blockchain().ShardID() == myShardID {
getLogger().Info("staying in the same shard")
} else {
getLogger().Info("moving to another shard")
if err := node.shardChains.Close(); err != nil {
getLogger().Error("cannot close shard chains", "error", err)
}
restartProcess(getRestartArguments(myShardID))
}
}
*/
func findRoleInShardState(
key *bls.PublicKey, state shard.State,
) (shardID uint32, isLeader bool) {
keyBytes := key.Serialize()
for idx, shard := range state {
for nodeIdx, nodeID := range shard.NodeList {
if bytes.Compare(nodeID.BlsPublicKey[:], keyBytes) == 0 {
return uint32(idx), nodeIdx == 0
}
}
}
return math.MaxUint32, false
}
func restartProcess(args []string) {
execFile, err := getBinaryPath()
if err != nil {
utils.Logger().Error().
Err(err).
Str("file", execFile).
Msg("Failed to get program path when restarting program")
}
utils.Logger().Info().
Strs("args", args).
Strs("env", os.Environ()).
Msg("Restarting program")
err = syscall.Exec(execFile, args, os.Environ())
if err != nil {
utils.Logger().Error().
Err(err).
Msg("Failed to restart program after resharding")
}
panic("syscall.Exec() is not supposed to return")
}
func getRestartArguments(myShardID uint32) []string {
args := os.Args
hasShardID := false
shardIDFlag := "-shard_id"
// newNodeFlag := "-is_newnode"
for i, arg := range args {
if arg == shardIDFlag {
if i+1 < len(args) {
args[i+1] = strconv.Itoa(int(myShardID))
} else {
args = append(args, strconv.Itoa(int(myShardID)))
}
hasShardID = true
}
// TODO: enable this
//if arg == newNodeFlag {
// args[i] = "" // remove new node flag
//}
}
if !hasShardID {
args = append(args, shardIDFlag)
args = append(args, strconv.Itoa(int(myShardID)))
}
return args
}
// Gets the path of this currently running binary program.
func getBinaryPath() (argv0 string, err error) {
argv0, err = exec.LookPath(os.Args[0])
if nil != err {
return
}
if _, err = os.Stat(argv0); nil != err {
return
}
return
}

@ -64,10 +64,15 @@ func (node *Node) setupForClientNode() {
}
func (node *Node) setupForExplorerNode() {
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetShardGroupID(), nil, nil))
nodeConfig, chanPeer := node.initNodeConfiguration()
// Register peer discovery service.
node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil))
// Register networkinfo service.
node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil))
// Register explorer service.
node.serviceManager.RegisterService(service.SupportExplorer, explorer.New(&node.SelfPeer, node.NodeConfig.GetShardID(), node.Consensus.GetNodeIDs, node.GetBalanceOfAddress))
// Register explorer service.
}
// ServiceManagerSetup setups service store.

Loading…
Cancel
Save