diff --git a/api/service/explorer/service.go b/api/service/explorer/service.go index b5ffa1b8b..d96c1bc56 100644 --- a/api/service/explorer/service.go +++ b/api/service/explorer/service.go @@ -7,11 +7,13 @@ import ( "fmt" "net" "net/http" + "os" "path" "strconv" "time" "github.com/RoaringBitmap/roaring/roaring64" + "github.com/harmony-one/harmony/internal/common" harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony" ethCommon "github.com/ethereum/go-ethereum/common" @@ -113,12 +115,17 @@ func (s *Service) Run() *http.Server { s.router = mux.NewRouter() + fmt.Println("++", addr) + // Set up router for addresses. // Fetch addresses request, accepts parameter size: how much addresses to read, // parameter prefix: from which address prefix start s.router.Path("/addresses").Queries("size", "{[0-9]*?}", "prefix", "{[a-zA-Z0-9]*?}").HandlerFunc(s.GetAddresses).Methods("GET") s.router.Path("/addresses").HandlerFunc(s.GetAddresses) s.router.Path("/height").HandlerFunc(s.GetHeight) + s.router.Path("/leader").HandlerFunc(s.GetLeader) + s.router.Path("/blocks").HandlerFunc(s.GetBlocks) + s.router.Path("/halt").HandlerFunc(s.halt) // Set up router for supply info s.router.Path("/burn-addresses").Queries().HandlerFunc(s.GetInaccessibleAddressInfo).Methods("GET") @@ -186,6 +193,47 @@ type HeightResponse struct { S3 uint64 `json:"3,omitempty"` } +func (s *Service) GetLeader(w http.ResponseWriter, r *http.Request) { + if s.backend.IsCurrentlyLeader() { + w.Write([]byte("true ")) + } else { + w.Write([]byte("false")) + } + + keys := "" + for _, p := range s.backend.GetPublicKeys() { + addr := common.Address{} + addrBytes := p.Object.GetAddress() + addr.SetBytes(addrBytes[:]) + keys += fmt.Sprintf("%s ", addr.String()) + break + } + //blsPubKeyBytes := leaderKey.Object.GetAddress() + //coinbase.SetBytes(blsPubKeyBytes[:]) + + w.Write([]byte(fmt.Sprintf(" %d", s.blockchain.ShardID()))) + w.Write([]byte(fmt.Sprintf(" %s", s.Port))) + w.Write([]byte(fmt.Sprintf(" %s", keys))) + w.Write([]byte(fmt.Sprintf(" %s", s.backend.GetPublicKeys().SerializeToHexStr()))) + +} + +func (s *Service) GetBlocks(w http.ResponseWriter, r *http.Request) { + cur := s.blockchain.CurrentHeader().Number().Uint64() + + for i := cur; i > 0; i-- { + block := s.blockchain.GetBlockByNumber(i) + w.Write([]byte(fmt.Sprintf("%d ", i))) + w.Write([]byte(fmt.Sprintf("%s ", block.Header().ViewID().String()))) + w.Write([]byte(fmt.Sprintf("%s ", block.Header().Coinbase().Hash().Hex()))) + w.Write([]byte(fmt.Sprintf("%s\n", block.Header().Coinbase().Hex()))) + } +} + +func (s *Service) halt(w http.ResponseWriter, r *http.Request) { + os.Exit(0) +} + // GetHeight returns heights of current and beacon chains if needed. func (s *Service) GetHeight(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index f01cb758e..0527a0b8c 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -266,6 +266,8 @@ func setupNodeLog(config harmonyconfig.HarmonyConfig) { func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) { var err error + fmt.Println("OS: ", os.Args) + nodeconfigSetShardSchedule(hc) nodeconfig.SetShardingSchedule(shard.Schedule) nodeconfig.SetVersion(getHarmonyVersion()) @@ -409,8 +411,9 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) { if currentNode.NodeConfig.Role() == nodeconfig.Validator { currentNode.RegisterValidatorServices() } else if currentNode.NodeConfig.Role() == nodeconfig.ExplorerNode { - currentNode.RegisterExplorerServices() + } + currentNode.RegisterExplorerServices() currentNode.RegisterService(service.CrosslinkSending, crosslink_sending.New(currentNode, currentNode.Blockchain())) if hc.Pprof.Enabled { setupPprofService(currentNode, hc) @@ -784,6 +787,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi // Set the consensus ID to be the current block number viewID := currentNode.Blockchain().CurrentBlock().Header().ViewID().Uint64() + fmt.Println("viewID:", viewID) currentConsensus.SetViewIDs(viewID + 1) utils.Logger().Info(). Uint64("viewID", viewID). diff --git a/consensus/checks.go b/consensus/checks.go index 28da66ad7..ff4b53927 100644 --- a/consensus/checks.go +++ b/consensus/checks.go @@ -56,8 +56,7 @@ func (consensus *Consensus) senderKeySanityChecks(msg *msg_pb.Message, senderKey } func (consensus *Consensus) isRightBlockNumAndViewID(recvMsg *FBFTMessage) bool { - blockNum := consensus.getBlockNum() - if recvMsg.ViewID != consensus.getCurBlockViewID() || recvMsg.BlockNum != blockNum { + if recvMsg.ViewID != consensus.GetCurBlockViewID() || recvMsg.BlockNum != consensus.BlockNum() { consensus.getLogger().Debug(). Uint64("blockNum", blockNum). Str("recvMsg", recvMsg.String()). diff --git a/consensus/consensus.go b/consensus/consensus.go index ac15adc15..69a4f4638 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -71,6 +71,8 @@ type Consensus struct { priKey multibls.PrivateKeys // the publickey of leader LeaderPubKey *bls.PublicKeyWrapper + // index of leader in the list of validators. + LeaderIndex int // blockNum: the next blockNumber that FBFT is going to agree on, // should be equal to the blockNumber of next block blockNum uint64 diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index 0e4bb6814..e646504dd 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -1,13 +1,13 @@ package consensus import ( + "fmt" "math/big" "sync/atomic" "time" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/crypto/bls" - "github.com/harmony-one/harmony/multibls" "github.com/ethereum/go-ethereum/common" protobuf "github.com/golang/protobuf/proto" @@ -74,12 +74,8 @@ func (consensus *Consensus) signAndMarshalConsensusMessage(message *msg_pb.Messa // UpdatePublicKeys updates the PublicKeys for // quorum on current subcommittee, protected by a mutex func (consensus *Consensus) UpdatePublicKeys(pubKeys, allowlist []bls_cosi.PublicKeyWrapper) int64 { - consensus.mutex.Lock() - defer consensus.mutex.Unlock() - return consensus.updatePublicKeys(pubKeys, allowlist) -} - -func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.PublicKeyWrapper) int64 { + // TODO: use mutex for updating public keys pointer. No need to lock on all these logic. + consensus.pubKeyLock.Lock() consensus.Decider.UpdateParticipants(pubKeys, allowlist) consensus.getLogger().Info().Msg("My Committee updated") for i := range pubKeys { @@ -98,20 +94,15 @@ func (consensus *Consensus) updatePublicKeys(pubKeys, allowlist []bls_cosi.Publi consensus.getLogger().Error(). Msg("[UpdatePublicKeys] Participants is empty") } - for i := range pubKeys { - consensus.getLogger().Info(). - Int("index", i). - Str("BLSPubKey", pubKeys[i].Bytes.Hex()). - Msg("Member") - } + consensus.pubKeyLock.Unlock() // reset states after update public keys // TODO: incorporate bitmaps in the decider, so their state can't be inconsistent. - consensus.updateBitmaps() - consensus.resetState() + consensus.UpdateBitmaps() + consensus.ResetState() // do not reset view change state if it is in view changing mode - if !consensus.isViewChangingMode() { - consensus.resetViewChangeState() + if !consensus.IsViewChangingMode() { + consensus.ResetViewChangeState() } return consensus.Decider.ParticipantsCount() } @@ -138,7 +129,7 @@ func (consensus *Consensus) signConsensusMessage(message *msg_pb.Message, } // UpdateBitmaps update the bitmaps for prepare and commit phase -func (consensus *Consensus) updateBitmaps() { +func (consensus *Consensus) UpdateBitmaps() { consensus.getLogger().Debug(). Str("MessageType", consensus.phase.String()). Msg("[UpdateBitmaps] Updating consensus bitmaps") @@ -148,12 +139,13 @@ func (consensus *Consensus) updateBitmaps() { multiSigBitmap, _ := bls_cosi.NewMask(members, nil) consensus.prepareBitmap = prepareBitmap consensus.commitBitmap = commitBitmap + consensus.multiSigMutex.Lock() consensus.multiSigBitmap = multiSigBitmap - + consensus.multiSigMutex.Unlock() } // ResetState resets the state of the consensus -func (consensus *Consensus) resetState() { +func (consensus *Consensus) ResetState() { consensus.switchPhase("ResetState", FBFTAnnounce) consensus.blockHash = [32]byte{} @@ -171,24 +163,11 @@ func (consensus *Consensus) resetState() { // IsValidatorInCommittee returns whether the given validator BLS address is part of my committee func (consensus *Consensus) IsValidatorInCommittee(pubKey bls.SerializedPublicKey) bool { - consensus.mutex.RLock() - defer consensus.mutex.RUnlock() - return consensus.isValidatorInCommittee(pubKey) -} - -func (consensus *Consensus) isValidatorInCommittee(pubKey bls.SerializedPublicKey) bool { return consensus.Decider.IndexOf(pubKey) != -1 } // SetMode sets the mode of consensus func (consensus *Consensus) SetMode(m Mode) { - consensus.mutex.Lock() - defer consensus.mutex.Unlock() - consensus.setMode(m) -} - -// SetMode sets the mode of consensus -func (consensus *Consensus) setMode(m Mode) { if m == Normal && consensus.isBackup { m = NormalBackup } @@ -210,13 +189,6 @@ func (consensus *Consensus) SetIsBackup(isBackup bool) { // Mode returns the mode of consensus func (consensus *Consensus) Mode() Mode { - consensus.mutex.RLock() - defer consensus.mutex.RUnlock() - return consensus.mode() -} - -// mode returns the mode of consensus -func (consensus *Consensus) mode() Mode { return consensus.current.Mode() } @@ -236,11 +208,12 @@ func (consensus *Consensus) checkViewID(msg *FBFTMessage) error { if consensus.IgnoreViewIDCheck.IsSet() { //in syncing mode, node accepts incoming messages without viewID/leaderKey checking //so only set mode to normal when new node enters consensus and need checking viewID - consensus.setMode(Normal) - consensus.setViewIDs(msg.ViewID) + consensus.SetMode(Normal) + consensus.SetViewIDs(msg.ViewID) if !msg.HasSingleSender() { return errors.New("Leader message can not have multiple sender keys") } + fmt.Println("[checkViewID] Set LEADEER PUB KEY ", msg.SenderPubkeys[0].Bytes.Hex(), utils.GetPort()) consensus.LeaderPubKey = msg.SenderPubkeys[0] consensus.IgnoreViewIDCheck.UnSet() consensus.consensusTimeout[timeoutConsensus].Start() @@ -248,9 +221,9 @@ func (consensus *Consensus) checkViewID(msg *FBFTMessage) error { Str("leaderKey", consensus.LeaderPubKey.Bytes.Hex()). Msg("[checkViewID] Start consensus timer") return nil - } else if msg.ViewID > consensus.getCurBlockViewID() { + } else if msg.ViewID > consensus.GetCurBlockViewID() { return consensus_engine.ErrViewIDNotMatch - } else if msg.ViewID < consensus.getCurBlockViewID() { + } else if msg.ViewID < consensus.GetCurBlockViewID() { return errors.New("view ID belongs to the past") } return nil @@ -261,26 +234,17 @@ func (consensus *Consensus) SetBlockNum(blockNum uint64) { atomic.StoreUint64(&consensus.blockNum, blockNum) } -// SetBlockNum sets the blockNum in consensus object, called at node bootstrap -func (consensus *Consensus) setBlockNum(blockNum uint64) { - atomic.StoreUint64(&consensus.blockNum, blockNum) -} - // ReadSignatureBitmapPayload read the payload for signature and bitmap; offset is the beginning position of reading -func (consensus *Consensus) ReadSignatureBitmapPayload(recvPayload []byte, offset int) (*bls_core.Sign, *bls_cosi.Mask, error) { - consensus.mutex.RLock() - members := consensus.Decider.Participants() - consensus.mutex.RUnlock() - return consensus.readSignatureBitmapPayload(recvPayload, offset, members) -} - -func (consensus *Consensus) readSignatureBitmapPayload(recvPayload []byte, offset int, members multibls.PublicKeys) (*bls_core.Sign, *bls_cosi.Mask, error) { +func (consensus *Consensus) ReadSignatureBitmapPayload( + recvPayload []byte, offset int, +) (*bls_core.Sign, *bls_cosi.Mask, error) { if offset+bls.BLSSignatureSizeInBytes > len(recvPayload) { return nil, nil, errors.New("payload not have enough length") } sigAndBitmapPayload := recvPayload[offset:] // TODO(audit): keep a Mask in the Decider so it won't be reconstructed on the fly. + members := consensus.Decider.Participants() return chain.ReadSignatureBitmapByPublicKeys( sigAndBitmapPayload, members, ) @@ -297,12 +261,6 @@ func (consensus *Consensus) readSignatureBitmapPayload(recvPayload []byte, offse // (b) node in committed but has any err during processing: Syncing mode // (c) node in committed and everything looks good: Normal mode func (consensus *Consensus) UpdateConsensusInformation() Mode { - consensus.mutex.Lock() - defer consensus.mutex.Unlock() - return consensus.updateConsensusInformation() -} - -func (consensus *Consensus) updateConsensusInformation() Mode { curHeader := consensus.Blockchain().CurrentHeader() curEpoch := curHeader.Epoch() nextEpoch := new(big.Int).Add(curHeader.Epoch(), common.Big1) @@ -407,7 +365,7 @@ func (consensus *Consensus) updateConsensusInformation() Mode { consensus.getLogger().Info(). Int("numPubKeys", len(pubKeys)). Msg("[UpdateConsensusInformation] Successfully updated public keys") - consensus.updatePublicKeys(pubKeys, shard.Schedule.InstanceForEpoch(nextEpoch).ExternalAllowlist()) + consensus.UpdatePublicKeys(pubKeys, shard.Schedule.InstanceForEpoch(nextEpoch).ExternalAllowlist()) // Update voters in the committee if _, err := consensus.Decider.SetVoters( @@ -438,13 +396,16 @@ func (consensus *Consensus) updateConsensusInformation() Mode { consensus.getLogger().Info(). Str("leaderPubKey", leaderPubKey.Bytes.Hex()). Msg("[UpdateConsensusInformation] Most Recent LeaderPubKey Updated Based on BlockChain") + consensus.pubKeyLock.Lock() + fmt.Println("[UpdateConsensusInformation] Most Recent LeaderPubKey Updated Based on BlockChain", leaderPubKey.Bytes.Hex(), utils.GetPort()) consensus.LeaderPubKey = leaderPubKey + consensus.pubKeyLock.Unlock() } } for _, key := range pubKeys { // in committee - myPubKeys := consensus.getPublicKeys() + myPubKeys := consensus.GetPublicKeys() if myPubKeys.Contains(key.Object) { if hasError { consensus.getLogger().Error(). @@ -456,7 +417,7 @@ func (consensus *Consensus) updateConsensusInformation() Mode { // If the leader changed and I myself become the leader if (oldLeader != nil && consensus.LeaderPubKey != nil && - !consensus.LeaderPubKey.Object.IsEqual(oldLeader.Object)) && consensus.isLeader() { + !consensus.LeaderPubKey.Object.IsEqual(oldLeader.Object)) && consensus.IsLeader() { go func() { consensus.getLogger().Info(). Str("myKey", myPubKeys.SerializeToHexStr()). @@ -477,10 +438,15 @@ func (consensus *Consensus) updateConsensusInformation() Mode { // IsLeader check if the node is a leader or not by comparing the public key of // the node with the leader public key func (consensus *Consensus) IsLeader() bool { - consensus.mutex.RLock() - defer consensus.mutex.RUnlock() - - return consensus.isLeader() + consensus.pubKeyLock.Lock() + obj := consensus.LeaderPubKey.Object + consensus.pubKeyLock.Unlock() + for _, key := range consensus.priKey { + if key.Pub.Object.IsEqual(obj) { + return true + } + } + return false } // isLeader check if the node is a leader or not by comparing the public key of @@ -498,16 +464,9 @@ func (consensus *Consensus) isLeader() bool { // SetViewIDs set both current view ID and view changing ID to the height // of the blockchain. It is used during client startup to recover the state func (consensus *Consensus) SetViewIDs(height uint64) { - consensus.mutex.Lock() - defer consensus.mutex.Unlock() - consensus.setViewIDs(height) -} - -// SetViewIDs set both current view ID and view changing ID to the height -// of the blockchain. It is used during client startup to recover the state -func (consensus *Consensus) setViewIDs(height uint64) { - consensus.setCurBlockViewID(height) - consensus.setViewChangingID(height) + fmt.Println("SetViewIDs", height) + consensus.SetCurBlockViewID(height) + consensus.SetViewChangingID(height) } // SetCurBlockViewID set the current view ID @@ -515,18 +474,22 @@ func (consensus *Consensus) SetCurBlockViewID(viewID uint64) uint64 { return consensus.current.SetCurBlockViewID(viewID) } -// SetCurBlockViewID set the current view ID -func (consensus *Consensus) setCurBlockViewID(viewID uint64) { - consensus.current.SetCurBlockViewID(viewID) +// SetLeaderIndex set the leader index. +func (consensus *Consensus) SetLeaderIndex(f func(int) int) (current int) { + consensus.pubKeyLock.Lock() + defer consensus.pubKeyLock.Unlock() + consensus.LeaderIndex = f(consensus.LeaderIndex) + return consensus.LeaderIndex } -// SetViewChangingID set the current view change ID -func (consensus *Consensus) SetViewChangingID(viewID uint64) { - consensus.current.SetViewChangingID(viewID) +func (consensus *Consensus) GetLeaderIndex() int { + consensus.pubKeyLock.Lock() + defer consensus.pubKeyLock.Unlock() + return consensus.LeaderIndex } // SetViewChangingID set the current view change ID -func (consensus *Consensus) setViewChangingID(viewID uint64) { +func (consensus *Consensus) SetViewChangingID(viewID uint64) { consensus.current.SetViewChangingID(viewID) } @@ -535,6 +498,15 @@ func (consensus *Consensus) StartFinalityCount() { consensus.finalityCounter.Store(time.Now().UnixNano()) } +//func (consensus *Consensus) ReshardingNextLeader(newblock *types.Block) { +// consensus.pubKeyLock.Lock() +// fmt.Println("nextBlock1 ", newblock.Header().Number().Uint64(), " ", consensus.LeaderPubKey.Bytes.Hex()) +// consensus.LeaderPubKey = consensus.getNextLeaderKey(consensus.GetCurBlockViewID() + 1) +// fmt.Println("nextBlock2 ", newblock.Header().Number().Uint64(), " ", consensus.LeaderPubKey.Bytes.Hex()) +// consensus.pubKeyLock.Unlock() +// +//} + // FinishFinalityCount calculate the current finality func (consensus *Consensus) FinishFinalityCount() { d := time.Now().UnixNano() @@ -554,7 +526,8 @@ func (consensus *Consensus) switchPhase(subject string, desired FBFTPhase) { Str("to:", desired.String()). Str("switchPhase:", subject) - consensus.phase = desired + consensus.phase.Set(desired) + return } var ( @@ -576,13 +549,13 @@ func (consensus *Consensus) selfCommit(payload []byte) error { return errGetPreparedBlock } - aggSig, mask, err := consensus.readSignatureBitmapPayload(payload, 32, consensus.Decider.Participants()) + aggSig, mask, err := consensus.ReadSignatureBitmapPayload(payload, 32) if err != nil { return errReadBitmapPayload } // Have to keep the block hash so the leader can finish the commit phase of prepared block - consensus.resetState() + consensus.ResetState() copy(consensus.blockHash[:], blockHash[:]) consensus.switchPhase("selfCommit", FBFTCommit) @@ -639,18 +612,11 @@ func (consensus *Consensus) NumSignaturesIncludedInBlock(block *types.Block) uin return count } -// GetLogger returns logger for consensus contexts added. -func (consensus *Consensus) GetLogger() *zerolog.Logger { - consensus.mutex.RLock() - defer consensus.mutex.RUnlock() - return consensus.getLogger() -} - // getLogger returns logger for consensus contexts added func (consensus *Consensus) getLogger() *zerolog.Logger { logger := utils.Logger().With(). - Uint64("myBlock", consensus.blockNum). - Uint64("myViewID", consensus.getCurBlockViewID()). + Uint64("myBlock", consensus.BlockNum()). + Uint64("myViewID", consensus.GetCurBlockViewID()). Str("phase", consensus.phase.String()). Str("mode", consensus.current.Mode().String()). Logger() diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 99bccf755..16554d039 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -4,13 +4,13 @@ import ( "bytes" "context" "encoding/hex" - "math/big" + "fmt" "sync/atomic" "time" - "github.com/ethereum/go-ethereum/common" bls2 "github.com/harmony-one/bls/ffi/go/bls" "github.com/harmony-one/harmony/consensus/signature" + nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/utils" @@ -46,24 +46,16 @@ const ( // IsViewChangingMode return true if curernt mode is viewchanging func (consensus *Consensus) IsViewChangingMode() bool { - consensus.mutex.RLock() - defer consensus.mutex.RUnlock() - return consensus.isViewChangingMode() -} - -func (consensus *Consensus) isViewChangingMode() bool { return consensus.current.Mode() == ViewChanging } // HandleMessageUpdate will update the consensus state according to received message func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb.Message, senderKey *bls.SerializedPublicKey) error { - consensus.mutex.Lock() - defer consensus.mutex.Unlock() // when node is in ViewChanging mode, it still accepts normal messages into FBFTLog // in order to avoid possible trap forever but drop PREPARE and COMMIT // which are message types specifically for a node acting as leader // so we just ignore those messages - if consensus.isViewChangingMode() && + if consensus.IsViewChangingMode() && (msg.Type == msg_pb.MessageType_PREPARE || msg.Type == msg_pb.MessageType_COMMIT) { return nil @@ -95,7 +87,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb members := consensus.Decider.Participants() fbftMsg, err = ParseNewViewMessage(msg, members) default: - fbftMsg, err = consensus.parseFBFTMessage(msg) + fbftMsg, err = consensus.ParseFBFTMessage(msg) } if err != nil || fbftMsg == nil { return errors.Wrapf(err, "unable to parse consensus msg with type: %s", msg.Type) @@ -103,8 +95,8 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb canHandleViewChange := true intendedForValidator, intendedForLeader := - !consensus.isLeader(), - consensus.isLeader() + !consensus.IsLeader(), + consensus.IsLeader() // if in backup normal mode, force ignore view change event and leader event. if consensus.current.Mode() == NormalBackup { @@ -139,14 +131,15 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb } func (consensus *Consensus) finalCommit() { + // THIS IS NOT GOOD PLACE FOR LEADER SWITCHING numCommits := consensus.Decider.SignersCount(quorum.Commit) consensus.getLogger().Info(). Int64("NumCommits", numCommits). Msg("[finalCommit] Finalizing Consensus") - beforeCatchupNum := consensus.getBlockNum() + beforeCatchupNum := consensus.BlockNum() - leaderPriKey, err := consensus.getConsensusLeaderPrivateKey() + leaderPriKey, err := consensus.GetConsensusLeaderPrivateKey() if err != nil { consensus.getLogger().Error().Err(err).Msg("[finalCommit] leader not found") return @@ -185,7 +178,7 @@ func (consensus *Consensus) finalCommit() { // Note: leader already sent 67% commit in preCommit. The 100% commit won't be sent immediately // to save network traffic. It will only be sent in retry if consensus doesn't move forward. // Or if the leader is changed for next block, the 100% committed sig will be sent to the next leader immediately. - if !consensus.isLeader() || block.IsLastBlockInEpoch() { + if !consensus.IsLeader() || block.IsLastBlockInEpoch() { // send immediately if err := consensus.msgSender.SendWithRetry( block.NumberU64(), @@ -250,7 +243,7 @@ func (consensus *Consensus) finalCommit() { // If still the leader, send commit sig/bitmap to finish the new block proposal, // else, the block proposal will timeout by itself. - if consensus.isLeader() { + if consensus.IsLeader() { if block.IsLastBlockInEpoch() { // No pipelining go func() { @@ -298,29 +291,143 @@ func (consensus *Consensus) BlockCommitSigs(blockNum uint64) ([]byte, error) { // Start waits for the next new block and run consensus func (consensus *Consensus) Start( - stopChan chan struct{}, + blockChannel chan *types.Block, stopChan, stoppedChan, startChannel chan struct{}, ) { go func() { + toStart := make(chan struct{}, 1) + isInitialLeader := consensus.IsLeader() + if isInitialLeader { + consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Waiting for consensus start") + // send a signal to indicate it's ready to run consensus + // this signal is consumed by node object to create a new block and in turn trigger a new consensus on it + go func() { + <-startChannel + toStart <- struct{}{} + consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal") + consensus.ReadySignal <- SyncProposal + }() + } consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Consensus started") - go func() { - ticker := time.NewTicker(250 * time.Millisecond) - defer ticker.Stop() - for { - select { - case <-stopChan: - return - case <-ticker.C: - consensus.Tick() - } - } - }() - - consensus.mutex.Lock() + defer close(stoppedChan) + ticker := time.NewTicker(250 * time.Millisecond) + defer ticker.Stop() consensus.consensusTimeout[timeoutBootstrap].Start() consensus.getLogger().Info().Msg("[ConsensusMainLoop] Start bootstrap timeout (only once)") + // Set up next block due time. consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod) - consensus.mutex.Unlock() + start := false + for { + select { + case <-toStart: + start = true + case <-ticker.C: + if !start && isInitialLeader { + continue + } + for k, v := range consensus.consensusTimeout { + // stop timer in listening mode + if consensus.current.Mode() == Listening { + v.Stop() + continue + } + + if consensus.current.Mode() == Syncing { + // never stop bootstrap timer here in syncing mode as it only starts once + // if it is stopped, bootstrap will be stopped and nodes + // can't start view change or join consensus + // the bootstrap timer will be stopped once consensus is reached or view change + // is succeeded + if k != timeoutBootstrap { + consensus.getLogger().Debug(). + Str("k", k.String()). + Str("Mode", consensus.current.Mode().String()). + Msg("[ConsensusMainLoop] consensusTimeout stopped!!!") + v.Stop() + continue + } + } + if !v.CheckExpire() { + continue + } + if k != timeoutViewChange { + consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops Consensus Timeout!!!") + consensus.startViewChange() + break + } else { + consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops View Change Timeout!!!") + consensus.startViewChange() + break + } + } + + // TODO: Refactor this piece of code to consensus/downloader.go after DNS legacy sync is removed + case <-consensus.syncReadyChan: + consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan") + consensus.mutex.Lock() + if consensus.BlockNum() < consensus.Blockchain().CurrentHeader().Number().Uint64()+1 { + consensus.SetBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1) + consensus.SetViewIDs(consensus.Blockchain().CurrentHeader().ViewID().Uint64() + 1) + mode := consensus.UpdateConsensusInformation() + consensus.current.SetMode(mode) + consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer") + consensus.consensusTimeout[timeoutConsensus].Start() + consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC") + consensusSyncCounterVec.With(prometheus.Labels{"consensus": "in_sync"}).Inc() + } else if consensus.Mode() == Syncing { + // Corner case where sync is triggered before `onCommitted` and there is a race + // for block insertion between consensus and downloader. + mode := consensus.UpdateConsensusInformation() + consensus.SetMode(mode) + consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer") + consensus.consensusTimeout[timeoutConsensus].Start() + consensusSyncCounterVec.With(prometheus.Labels{"consensus": "in_sync"}).Inc() + } + consensus.mutex.Unlock() + + // TODO: Refactor this piece of code to consensus/downloader.go after DNS legacy sync is removed + case <-consensus.syncNotReadyChan: + consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan") + consensus.SetBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1) + consensus.current.SetMode(Syncing) + consensus.getLogger().Info().Msg("[ConsensusMainLoop] Node is OUT OF SYNC") + consensusSyncCounterVec.With(prometheus.Labels{"consensus": "out_of_sync"}).Inc() + + case newBlock := <-blockChannel: + //consensus.ReshardingNextLeader(newBlock) + consensus.getLogger().Info(). + Uint64("MsgBlockNum", newBlock.NumberU64()). + Msg("[ConsensusMainLoop] Received Proposed New Block!") + + if newBlock.NumberU64() < consensus.BlockNum() { + consensus.getLogger().Warn().Uint64("newBlockNum", newBlock.NumberU64()). + Msg("[ConsensusMainLoop] received old block, abort") + continue + } + // Sleep to wait for the full block time + consensus.getLogger().Info().Msg("[ConsensusMainLoop] Waiting for Block Time") + + <-time.After(time.Until(consensus.NextBlockDue)) + consensus.StartFinalityCount() + + // Update time due for next block + consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod) + + startTime = time.Now() + consensus.msgSender.Reset(newBlock.NumberU64()) + + consensus.getLogger().Info(). + Int("numTxs", len(newBlock.Transactions())). + Int("numStakingTxs", len(newBlock.StakingTransactions())). + Time("startTime", startTime). + Int64("publicKeys", consensus.Decider.ParticipantsCount()). + Msg("[ConsensusMainLoop] STARTING CONSENSUS") + consensus.announce(newBlock) + case <-stopChan: + consensus.getLogger().Info().Msg("[ConsensusMainLoop] stopChan") + return + } + } }() if consensus.dHelper != nil { @@ -328,129 +435,28 @@ func (consensus *Consensus) Start( } } -func (consensus *Consensus) StartChannel() { - consensus.mutex.Lock() - consensus.isInitialLeader = consensus.isLeader() - if consensus.isInitialLeader { - consensus.start = true - consensus.getLogger().Info().Time("time", time.Now()).Msg("[ConsensusMainLoop] Send ReadySignal") - consensus.mutex.Unlock() - consensus.ReadySignal <- SyncProposal - return - } - consensus.mutex.Unlock() -} - -func (consensus *Consensus) syncReadyChan() { - consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncReadyChan") - if consensus.getBlockNum() < consensus.Blockchain().CurrentHeader().Number().Uint64()+1 { - consensus.setBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1) - consensus.setViewIDs(consensus.Blockchain().CurrentHeader().ViewID().Uint64() + 1) - mode := consensus.updateConsensusInformation() - consensus.current.SetMode(mode) - consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer") - consensus.consensusTimeout[timeoutConsensus].Start() - consensus.getLogger().Info().Str("Mode", mode.String()).Msg("Node is IN SYNC") - consensusSyncCounterVec.With(prometheus.Labels{"consensus": "in_sync"}).Inc() - } else if consensus.mode() == Syncing { - // Corner case where sync is triggered before `onCommitted` and there is a race - // for block insertion between consensus and downloader. - mode := consensus.updateConsensusInformation() - consensus.setMode(mode) - consensus.getLogger().Info().Msg("[syncReadyChan] Start consensus timer") - consensus.consensusTimeout[timeoutConsensus].Start() - consensusSyncCounterVec.With(prometheus.Labels{"consensus": "in_sync"}).Inc() - } -} - -func (consensus *Consensus) syncNotReadyChan() { - consensus.getLogger().Info().Msg("[ConsensusMainLoop] syncNotReadyChan") - consensus.setBlockNum(consensus.Blockchain().CurrentHeader().Number().Uint64() + 1) - consensus.current.SetMode(Syncing) - consensus.getLogger().Info().Msg("[ConsensusMainLoop] Node is OUT OF SYNC") - consensusSyncCounterVec.With(prometheus.Labels{"consensus": "out_of_sync"}).Inc() -} - -func (consensus *Consensus) Tick() { - consensus.mutex.Lock() - defer consensus.mutex.Unlock() - consensus.tick() -} - -func (consensus *Consensus) tick() { - if !consensus.start && consensus.isInitialLeader { - return - } - for k, v := range consensus.consensusTimeout { - // stop timer in listening mode - if consensus.current.Mode() == Listening { - v.Stop() - continue - } - - if consensus.current.Mode() == Syncing { - // never stop bootstrap timer here in syncing mode as it only starts once - // if it is stopped, bootstrap will be stopped and nodes - // can't start view change or join consensus - // the bootstrap timer will be stopped once consensus is reached or view change - // is succeeded - if k != timeoutBootstrap { - consensus.getLogger().Debug(). - Str("k", k.String()). - Str("Mode", consensus.current.Mode().String()). - Msg("[ConsensusMainLoop] consensusTimeout stopped!!!") - v.Stop() - continue - } - } - if !v.CheckExpire() { - continue - } - if k != timeoutViewChange { - consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops Consensus Timeout!!!") - consensus.startViewChange() - break - } else { - consensus.getLogger().Warn().Msg("[ConsensusMainLoop] Ops View Change Timeout!!!") - consensus.startViewChange() - break - } +// Close closes the consensus. If current is in normal commit phase, wait until the commit +// phase end. +func (consensus *Consensus) Close() error { + if consensus.dHelper != nil { + consensus.dHelper.close() } + consensus.waitForCommit() + return nil } -func (consensus *Consensus) BlockChannel(newBlock *types.Block) { - consensus.GetLogger().Info(). - Uint64("MsgBlockNum", newBlock.NumberU64()). - Msg("[ConsensusMainLoop] Received Proposed New Block!") - - if newBlock.NumberU64() < consensus.BlockNum() { - consensus.getLogger().Warn().Uint64("newBlockNum", newBlock.NumberU64()). - Msg("[ConsensusMainLoop] received old block, abort") +// waitForCommit wait extra 2 seconds for commit phase to finish +func (consensus *Consensus) waitForCommit() { + if consensus.Mode() != Normal || consensus.phase.Get() != FBFTCommit { return } - // Sleep to wait for the full block time - consensus.getLogger().Info().Msg("[ConsensusMainLoop] Waiting for Block Time") - time.AfterFunc(time.Until(consensus.NextBlockDue), func() { - consensus.StartFinalityCount() - consensus.mutex.Lock() - defer consensus.mutex.Unlock() - // Update time due for next block - consensus.NextBlockDue = time.Now().Add(consensus.BlockPeriod) - - startTime = time.Now() - consensus.msgSender.Reset(newBlock.NumberU64()) + // We only need to wait consensus is in normal commit phase + utils.Logger().Warn().Str("phase", consensus.phase.String()).Msg("[shutdown] commit phase has to wait") - consensus.getLogger().Info(). - Int("numTxs", len(newBlock.Transactions())). - Int("numStakingTxs", len(newBlock.StakingTransactions())). - Time("startTime", startTime). - Int64("publicKeys", consensus.Decider.ParticipantsCount()). - Msg("[ConsensusMainLoop] STARTING CONSENSUS") - consensus.announce(newBlock) - }) - - if consensus.dHelper != nil { - consensus.dHelper.start() + maxWait := time.Now().Add(2 * consensus.BlockPeriod) + for time.Now().Before(maxWait) && consensus.GetConsensusPhase() == "Commit" { + utils.Logger().Warn().Msg("[shutdown] wait for consensus finished") + time.Sleep(time.Millisecond * 100) } } @@ -465,29 +471,24 @@ type LastMileBlockIter struct { } // GetLastMileBlockIter get the iterator of the last mile blocks starting from number bnStart -func (consensus *Consensus) GetLastMileBlockIter(bnStart uint64, cb func(iter *LastMileBlockIter) error) error { +func (consensus *Consensus) GetLastMileBlockIter(bnStart uint64) (*LastMileBlockIter, error) { consensus.mutex.Lock() defer consensus.mutex.Unlock() - return consensus.getLastMileBlockIter(bnStart, cb) -} - -// GetLastMileBlockIter get the iterator of the last mile blocks starting from number bnStart -func (consensus *Consensus) getLastMileBlockIter(bnStart uint64, cb func(iter *LastMileBlockIter) error) error { if consensus.BlockVerifier == nil { - return errors.New("consensus haven't initialized yet") + return nil, errors.New("consensus haven't initialized yet") } blocks, _, err := consensus.getLastMileBlocksAndMsg(bnStart) if err != nil { - return err + return nil, err } - return cb(&LastMileBlockIter{ + return &LastMileBlockIter{ blockCandidates: blocks, fbftLog: consensus.FBFTLog, verify: consensus.BlockVerifier, curIndex: 0, logger: consensus.getLogger(), - }) + }, nil } // Next iterate to the next last mile block @@ -530,11 +531,12 @@ func (consensus *Consensus) getLastMileBlocksAndMsg(bnStart uint64) ([]*types.Bl // preCommitAndPropose commit the current block with 67% commit signatures and start // proposing new block which will wait on the full commit signatures to finish func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { + //fmt.Println("preCommitAndPropose", utils.GetPort(), blk.NumberU64()) if blk == nil { return errors.New("block to pre-commit is nil") } - leaderPriKey, err := consensus.getConsensusLeaderPrivateKey() + leaderPriKey, err := consensus.GetConsensusLeaderPrivateKey() if err != nil { consensus.getLogger().Error().Err(err).Msg("[preCommitAndPropose] leader not found") return err @@ -624,7 +626,7 @@ func (consensus *Consensus) tryCatchup() error { if consensus.BlockVerifier == nil { return errors.New("consensus haven't finished initialization") } - initBN := consensus.getBlockNum() + initBN := consensus.BlockNum() defer consensus.postCatchup(initBN) blks, msgs, err := consensus.getLastMileBlocksAndMsg(initBN) @@ -638,7 +640,7 @@ func (consensus *Consensus) tryCatchup() error { } blk.SetCurrentCommitSig(msg.Payload) - if err := consensus.verifyBlock(blk); err != nil { + if err := consensus.VerifyBlock(blk); err != nil { consensus.getLogger().Err(err).Msg("[TryCatchup] failed block verifier") return err } @@ -647,6 +649,7 @@ func (consensus *Consensus) tryCatchup() error { consensus.getLogger().Error().Err(err).Msg("[TryCatchup] Failed to add block to chain") return err } + //fmt.Println("tryCatchup ", utils.GetPort(), blk.NumberU64()) select { // TODO: Remove this when removing dns sync and stream sync is fully up case consensus.VerifiedNewBlock <- blk: @@ -661,6 +664,8 @@ func (consensus *Consensus) tryCatchup() error { } func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error { + // this function evaluates for all, leader and validators. + if consensus.Blockchain().CurrentBlock().NumberU64() < blk.NumberU64() { if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil { consensus.getLogger().Error().Err(err).Msg("[commitBlock] Failed to add block to chain") @@ -674,102 +679,58 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess } consensus.FinishFinalityCount() - go func() { - consensus.PostConsensusJob(blk) - }() - consensus.setupForNewConsensus(blk, committedMsg) + consensus.PostConsensusJob(blk) + consensus.SetupForNewConsensus(blk, committedMsg) utils.Logger().Info().Uint64("blockNum", blk.NumberU64()). Str("hash", blk.Header().Hash().Hex()). Msg("Added New Block to Blockchain!!!") - return nil } -// rotateLeader rotates the leader to the next leader in the committee. -// This function must be called with enabled leader rotation. -func (consensus *Consensus) rotateLeader(epoch *big.Int) { - prev := consensus.getLeaderPubKey() - bc := consensus.Blockchain() - curNumber := bc.CurrentHeader().Number().Uint64() - utils.Logger().Info().Msgf("[Rotating leader] epoch: %v rotation:%v numblocks:%d", epoch.Uint64(), bc.Config().IsLeaderRotation(epoch), bc.Config().LeaderRotationBlocksCount) - leader := consensus.getLeaderPubKey() - for i := 0; i < bc.Config().LeaderRotationBlocksCount; i++ { - header := bc.GetHeaderByNumber(curNumber - uint64(i)) - if header == nil { - return - } - // Previous epoch, we should not change leader. - if header.Epoch().Uint64() != epoch.Uint64() { - return - } - // Check if the same leader. - pub, err := bc.GetLeaderPubKeyFromCoinbase(header) - if err != nil { - utils.Logger().Error().Err(err).Msg("Failed to get leader public key from coinbase") - return - } - if !pub.Object.IsEqual(leader.Object) { - // Another leader. - return +// SetupForNewConsensus sets the state for new consensus +func (consensus *Consensus) SetupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) { + atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1) + curBlockViewID := consensus.SetCurBlockViewID(committedMsg.ViewID + 1) // first view id is going to be 2. + prev := consensus.GetLeaderPubKey() + idx := consensus.SetLeaderIndex(func(i int) int { + if curBlockViewID%3 == 0 { + return i + 1 } - } - // Passed all checks, we can change leader. - var ( - wasFound bool - next *bls.PublicKeyWrapper - ) - if consensus.ShardID == shard.BeaconChainShardID { - wasFound, next = consensus.Decider.NthNextHmy(shard.Schedule.InstanceForEpoch(epoch), leader, 1) - } else { - wasFound, next = consensus.Decider.NthNext(leader, 1) - } - if !wasFound { - utils.Logger().Error().Msg("Failed to get next leader") - return - } else { - consensus.setLeaderPubKey(next) - } - if consensus.isLeader() && !consensus.getLeaderPubKey().Object.IsEqual(prev.Object) { + return i + }) + pps := consensus.Decider.Participants() + consensus.pubKeyLock.Lock() + consensus.LeaderPubKey = &pps[idx%len(pps)] + fmt.Printf("SetupForNewConsensus :%d idx: %d future v%d new: %s prev: %s %q\n", utils.GetPort(), idx, curBlockViewID, consensus.LeaderPubKey.Bytes.Hex(), prev.Bytes.Hex(), consensus.isLeader()) + consensus.pubKeyLock.Unlock() + if consensus.IsLeader() && !consensus.GetLeaderPubKey().Object.IsEqual(prev.Object) { // leader changed go func() { + fmt.Printf("ReadySignal :%d for leader %s\n", utils.GetPort(), consensus.GetLeaderPubKey().Bytes.Hex()) + defer fmt.Printf("Defer ReadySignal :%d for leader %s\n", utils.GetPort(), consensus.GetLeaderPubKey().Bytes.Hex()) consensus.ReadySignal <- SyncProposal + }() - } -} -// SetupForNewConsensus sets the state for new consensus -func (consensus *Consensus) setupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) { - atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1) - consensus.setCurBlockViewID(committedMsg.ViewID + 1) - consensus.LeaderPubKey = committedMsg.SenderPubkeys[0] - var epoch *big.Int - if blk.IsLastBlockInEpoch() { - epoch = new(big.Int).Add(blk.Epoch(), common.Big1) - } else { - epoch = blk.Epoch() } - if consensus.Blockchain().Config().IsLeaderRotation(epoch) { - consensus.rotateLeader(epoch) - } - // Update consensus keys at last so the change of leader status doesn't mess up normal flow if blk.IsLastBlockInEpoch() { - consensus.setMode(consensus.updateConsensusInformation()) + consensus.SetMode(consensus.UpdateConsensusInformation()) } consensus.FBFTLog.PruneCacheBeforeBlock(blk.NumberU64()) - consensus.resetState() + consensus.ResetState() } func (consensus *Consensus) postCatchup(initBN uint64) { - if initBN < consensus.getBlockNum() { + if initBN < consensus.BlockNum() { consensus.getLogger().Info(). Uint64("From", initBN). - Uint64("To", consensus.getBlockNum()). + Uint64("To", consensus.BlockNum()). Msg("[TryCatchup] Caught up!") consensus.switchPhase("TryCatchup", FBFTAnnounce) } // catch up and skip from view change trap - if initBN < consensus.getBlockNum() && consensus.isViewChangingMode() { + if initBN < consensus.BlockNum() && consensus.IsViewChangingMode() { consensus.current.SetMode(Normal) consensus.consensusTimeout[timeoutViewChange].Stop() } @@ -777,7 +738,7 @@ func (consensus *Consensus) postCatchup(initBN uint64) { // GenerateVrfAndProof generates new VRF/Proof from hash of previous block func (consensus *Consensus) GenerateVrfAndProof(newHeader *block.Header) error { - key, err := consensus.getConsensusLeaderPrivateKey() + key, err := consensus.GetConsensusLeaderPrivateKey() if err != nil { return errors.New("[GenerateVrfAndProof] no leader private key provided") } @@ -830,7 +791,7 @@ func (consensus *Consensus) GenerateVdfAndProof(newBlock *types.Block, vrfBlockN start := time.Now() vdf.Execute() duration := time.Since(start) - consensus.GetLogger().Info(). + consensus.getLogger().Info(). Dur("duration", duration). Msg("[ConsensusMainLoop] VDF computation finished") output := <-outputChannel diff --git a/consensus/leader.go b/consensus/leader.go index 2f7766e19..21f3b770d 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -190,7 +190,17 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { } func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { + // TODO HERE + //if recvMsg.ViewID == 10 { + // return + //} + consensus.mutex.Lock() + defer consensus.mutex.Unlock() //// Read - Start + if consensus.ShardID == 0 { + //fmt.Println("onCommit ", recvMsg.BlockNum) + } + if !consensus.isRightBlockNumAndViewID(recvMsg) { return } @@ -322,4 +332,5 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { consensus.msgSender.StopRetry(msg_pb.MessageType_PREPARED) } + //fmt.Println("onCommit99: ", utils.GetPort(), recvMsg.BlockNum) } diff --git a/consensus/quorum/quorum.go b/consensus/quorum/quorum.go index aba62fa53..52b765a88 100644 --- a/consensus/quorum/quorum.go +++ b/consensus/quorum/quorum.go @@ -230,12 +230,17 @@ func (s *cIdentities) NthNextHmy(instance shardingconfig.Instance, pubKey *bls.P Msg("[NthNextHmy] pubKey not found") } numNodes := instance.NumHarmonyOperatedNodesPerShard() + //fmt.Println("??idx:", idx, numNodes) // sanity check to avoid out of bound access if numNodes <= 0 || numNodes > len(s.publicKeys) { numNodes = len(s.publicKeys) } idx = (idx + next) % numNodes - return found, &s.publicKeys[idx] + //fmt.Println("-------idx:", idx) + + new := &s.publicKeys[idx] + fmt.Println("NthNextHmy: ", pubKey.Bytes.Hex(), new.Bytes.Hex()) + return found, new } // NthNextHmyExt return the Nth next pubkey of Harmony + allowlist nodes, next can be negative number diff --git a/consensus/validator.go b/consensus/validator.go index f85cb8e3d..013769395 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -18,7 +18,8 @@ import ( ) func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { - recvMsg, err := consensus.parseFBFTMessage(msg) + + recvMsg, err := consensus.ParseFBFTMessage(msg) if err != nil { consensus.getLogger().Error(). Err(err). @@ -26,6 +27,9 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { Msg("[OnAnnounce] Unparseable leader message") return } + if consensus.ShardID == 0 { + //fmt.Println("onAnnounce called ", recvMsg.BlockNum) + } // NOTE let it handle its own logs if !consensus.onAnnounceSanityChecks(recvMsg) { @@ -38,10 +42,12 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { Uint64("MsgBlockNum", recvMsg.BlockNum). Msg("[OnAnnounce] Announce message Added") consensus.FBFTLog.AddVerifiedMessage(recvMsg) + consensus.mutex.Lock() + defer consensus.mutex.Unlock() consensus.blockHash = recvMsg.BlockHash // we have already added message and block, skip check viewID // and send prepare message if is in ViewChanging mode - if consensus.isViewChangingMode() { + if consensus.IsViewChangingMode() { consensus.getLogger().Debug(). Msg("[OnAnnounce] Still in ViewChanging Mode, Exiting !!") return @@ -62,7 +68,7 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { if len(recvMsg.Block) > 0 { go func() { // Best effort check, no need to error out. - _, err := consensus.ValidateNewBlock(recvMsg) + _, err := consensus.validateNewBlock(recvMsg) if err == nil { consensus.getLogger().Info(). @@ -72,12 +78,11 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { } } -func (consensus *Consensus) ValidateNewBlock(recvMsg *FBFTMessage) (*types.Block, error) { - consensus.mutex.Lock() - defer consensus.mutex.Unlock() - return consensus.validateNewBlock(recvMsg) -} func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block, error) { + // Lock to prevent race condition between announce and prepare + consensus.verifyBlockMutex.Lock() + defer consensus.verifyBlockMutex.Unlock() + if consensus.FBFTLog.IsBlockVerified(recvMsg.BlockHash) { var blockObj *types.Block @@ -131,7 +136,7 @@ func (consensus *Consensus) validateNewBlock(recvMsg *FBFTMessage) (*types.Block return nil, errors.New("nil block verifier") } - if err := consensus.verifyBlock(&blockObj); err != nil { + if err := consensus.VerifyBlock(&blockObj); err != nil { consensus.getLogger().Error().Err(err).Msg("[validateNewBlock] Block verification failed") return nil, errors.New("Block verification failed") } @@ -183,6 +188,12 @@ func (consensus *Consensus) sendCommitMessages(blockObj *types.Block) { // if onPrepared accepts the prepared message from the leader, then // it will send a COMMIT message for the leader to receive on the network. func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) { + if consensus.ShardID == 0 { + //fmt.Println("onPrepared", recvMsg.BlockNum) + } + consensus.mutex.Lock() + defer consensus.mutex.Unlock() + consensus.getLogger().Info(). Uint64("MsgBlockNum", recvMsg.BlockNum). Uint64("MsgViewID", recvMsg.ViewID). @@ -205,7 +216,7 @@ func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) { // check validity of prepared signature blockHash := recvMsg.BlockHash - aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 0, consensus.Decider.Participants()) + aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 0) if err != nil { consensus.getLogger().Error().Err(err).Msg("ReadSignatureBitmapPayload failed!") return @@ -271,13 +282,11 @@ func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) { return } curBlockNum := consensus.BlockNum() - consensus.mutex.Lock() - defer consensus.mutex.Unlock() for _, committedMsg := range consensus.FBFTLog.GetNotVerifiedCommittedMessages(blockObj.NumberU64(), blockObj.Header().ViewID().Uint64(), blockObj.Hash()) { if committedMsg != nil { consensus.onCommitted(committedMsg) } - if curBlockNum < consensus.getBlockNum() { + if curBlockNum < consensus.BlockNum() { consensus.getLogger().Info().Msg("[OnPrepared] Successfully caught up with committed message") break } @@ -286,6 +295,9 @@ func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) { } func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) { + consensus.mutex.Lock() + defer consensus.mutex.Unlock() + consensus.getLogger().Info(). Uint64("MsgBlockNum", recvMsg.BlockNum). Uint64("MsgViewID", recvMsg.ViewID). @@ -380,7 +392,7 @@ func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) { return } - if consensus.isViewChangingMode() { + if consensus.IsViewChangingMode() { consensus.getLogger().Info().Msg("[OnCommitted] Still in ViewChanging mode, Exiting!!") return } @@ -394,6 +406,13 @@ func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) { consensus.getLogger().Info().Msg("[OnCommitted] Start consensus timer (new block added)") consensus.consensusTimeout[timeoutConsensus].Start() } + + //fmt.Println("onCommitted", utils.GetPort(), recvMsg.BlockNum) + if blk != nil { + //consensus.ReshardingNextLeader(blk) + } else { + //fmt.Println("onCommitted", utils.GetPort(), recvMsg.BlockNum, "blk is nil") + } } // Collect private keys that are part of the current committee. @@ -401,7 +420,7 @@ func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) { func (consensus *Consensus) getPriKeysInCommittee() []*bls.PrivateKeyWrapper { priKeys := []*bls.PrivateKeyWrapper{} for i, key := range consensus.priKey { - if !consensus.isValidatorInCommittee(key.Pub.Bytes) { + if !consensus.IsValidatorInCommittee(key.Pub.Bytes) { continue } priKeys = append(priKeys, &consensus.priKey[i]) diff --git a/consensus/view_change.go b/consensus/view_change.go index aafdfd121..2936fba01 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -1,7 +1,9 @@ package consensus import ( + "fmt" "math/big" + "sync" "time" "github.com/harmony-one/harmony/internal/chain" @@ -24,21 +26,26 @@ const MaxViewIDDiff = 249 // State contains current mode and current viewID type State struct { - mode Mode + mode Mode + modeMux sync.RWMutex // current view id in normal mode // it changes per successful consensus blockViewID uint64 + cViewMux sync.RWMutex // view changing id is used during view change mode // it is the next view id viewChangingID uint64 + viewMux sync.RWMutex isBackup bool } // Mode return the current node mode func (pm *State) Mode() Mode { + pm.modeMux.RLock() + defer pm.modeMux.RUnlock() return pm.mode } @@ -48,16 +55,22 @@ func (pm *State) SetMode(s Mode) { s = NormalBackup } + pm.modeMux.Lock() + defer pm.modeMux.Unlock() pm.mode = s } // GetCurBlockViewID return the current view id func (pm *State) GetCurBlockViewID() uint64 { + pm.cViewMux.RLock() + defer pm.cViewMux.RUnlock() return pm.blockViewID } // SetCurBlockViewID sets the current view id func (pm *State) SetCurBlockViewID(viewID uint64) uint64 { + pm.cViewMux.Lock() + defer pm.cViewMux.Unlock() pm.blockViewID = viewID return pm.blockViewID } @@ -65,18 +78,26 @@ func (pm *State) SetCurBlockViewID(viewID uint64) uint64 { // GetViewChangingID return the current view changing id // It is meaningful during view change mode func (pm *State) GetViewChangingID() uint64 { + pm.viewMux.RLock() + defer pm.viewMux.RUnlock() return pm.viewChangingID } // SetViewChangingID set the current view changing id // It is meaningful during view change mode func (pm *State) SetViewChangingID(id uint64) { + pm.viewMux.Lock() + defer pm.viewMux.Unlock() pm.viewChangingID = id } // GetViewChangeDuraion return the duration of the current view change // It increase in the power of difference betweeen view changing ID and current view ID func (pm *State) GetViewChangeDuraion() time.Duration { + pm.viewMux.RLock() + pm.cViewMux.RLock() + defer pm.viewMux.RUnlock() + defer pm.cViewMux.RUnlock() diff := int64(pm.viewChangingID - pm.blockViewID) return time.Duration(diff * diff * int64(viewChangeDuration)) } @@ -88,14 +109,14 @@ func (pm *State) SetIsBackup(isBackup bool) { // fallbackNextViewID return the next view ID and duration when there is an exception // to calculate the time-based viewId func (consensus *Consensus) fallbackNextViewID() (uint64, time.Duration) { - diff := int64(consensus.getViewChangingID() + 1 - consensus.getCurBlockViewID()) + diff := int64(consensus.GetViewChangingID() + 1 - consensus.GetCurBlockViewID()) if diff <= 0 { diff = int64(1) } consensus.getLogger().Error(). Int64("diff", diff). Msg("[fallbackNextViewID] use legacy viewID algorithm") - return consensus.getViewChangingID() + 1, time.Duration(diff * diff * int64(viewChangeDuration)) + return consensus.GetViewChangingID() + 1, time.Duration(diff * diff * int64(viewChangeDuration)) } // getNextViewID return the next view ID based on the timestamp @@ -141,6 +162,7 @@ func (consensus *Consensus) getNextViewID() (uint64, time.Duration) { Uint64("stuckBlockViewID", stuckBlockViewID). Msg("[getNextViewID]") + fmt.Println("end getNextViewID: ", nextViewID, viewChangeDuration) // duration is always the fixed view change duration for synchronous view change return nextViewID, viewChangeDuration } @@ -152,9 +174,9 @@ func (consensus *Consensus) getNextViewID() (uint64, time.Duration) { func (consensus *Consensus) getNextLeaderKey(viewID uint64) *bls.PublicKeyWrapper { gap := 1 - cur := consensus.getCurBlockViewID() + cur := consensus.GetCurBlockViewID() if viewID > cur { - gap = int(viewID - cur) + gap = int(viewID - consensus.GetCurBlockViewID()) } var lastLeaderPubKey *bls.PublicKeyWrapper var err error @@ -196,29 +218,24 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64) *bls.PublicKeyWrappe Str("leaderPubKey", consensus.LeaderPubKey.Bytes.Hex()). Int("gap", gap). Uint64("newViewID", viewID). - Uint64("myCurBlockViewID", consensus.getCurBlockViewID()). + Uint64("myCurBlockViewID", consensus.GetCurBlockViewID()). Msg("[getNextLeaderKey] got leaderPubKey from coinbase") // wasFound, next := consensus.Decider.NthNext(lastLeaderPubKey, gap) // FIXME: rotate leader on harmony nodes only before fully externalization var wasFound bool var next *bls.PublicKeyWrapper - if blockchain != nil && blockchain.Config().IsLeaderRotation(epoch) { - if consensus.ShardID == shard.BeaconChainShardID { - wasFound, next = consensus.Decider.NthNextHmy( - shard.Schedule.InstanceForEpoch(epoch), - lastLeaderPubKey, - gap) - } else { - wasFound, next = consensus.Decider.NthNext( - lastLeaderPubKey, - gap) - } + if blockchain != nil && blockchain.Config().IsAllowlistEpoch(epoch) { + wasFound, next = consensus.Decider.NthNextHmyExt( + shard.Schedule.InstanceForEpoch(epoch), + lastLeaderPubKey, + gap) } else { wasFound, next = consensus.Decider.NthNextHmy( shard.Schedule.InstanceForEpoch(epoch), lastLeaderPubKey, gap) } + fmt.Println("wasfoundNext", consensus.Blockchain.Config().IsAllowlistEpoch(epoch), wasFound, next.Bytes.Hex(), lastLeaderPubKey.Bytes.Hex()) if !wasFound { consensus.getLogger().Warn(). Str("key", consensus.LeaderPubKey.Bytes.Hex()). @@ -240,26 +257,35 @@ func createTimeout() map[TimeoutType]*utils.Timeout { // startViewChange start the view change process func (consensus *Consensus) startViewChange() { - if consensus.disableViewChange || consensus.isBackup { + fmt.Printf("Message to send leader111: %d %s \n", utils.GetPort(), consensus.LeaderPubKey.Bytes.Hex()) + if consensus.disableViewChange || consensus.IsBackup() { return } + consensus.mutex.Lock() + defer consensus.mutex.Unlock() consensus.consensusTimeout[timeoutConsensus].Stop() consensus.consensusTimeout[timeoutBootstrap].Stop() consensus.current.SetMode(ViewChanging) nextViewID, duration := consensus.getNextViewID() - consensus.setViewChangingID(nextViewID) + //fmt.Println("startViewChange", nextViewID) + consensus.SetViewChangingID(nextViewID) // TODO: set the Leader PubKey to the next leader for view change // this is dangerous as the leader change is not succeeded yet // we use it this way as in many code we validate the messages // aganist the consensus.LeaderPubKey variable. // Ideally, we shall use another variable to keep track of the // leader pubkey in viewchange mode - consensus.LeaderPubKey = consensus.getNextLeaderKey(nextViewID) + consensus.pubKeyLock.Lock() + lpk := consensus.getNextLeaderKey(nextViewID) + consensus.LeaderPubKey = lpk + //fmt.Println("Message to send leader cur: ", consensus.LeaderPubKey.Bytes.Hex(), "next: ", lpk.Bytes.Hex()) + //fmt.Println("Message to send leader: ", consensus.LeaderPubKey.Bytes.Hex()) + consensus.pubKeyLock.Unlock() consensus.getLogger().Warn(). Uint64("nextViewID", nextViewID). - Uint64("viewChangingID", consensus.getViewChangingID()). + Uint64("viewChangingID", consensus.GetViewChangingID()). Dur("timeoutDuration", duration). Str("NextLeader", consensus.LeaderPubKey.Bytes.Hex()). Msg("[startViewChange]") @@ -276,7 +302,7 @@ func (consensus *Consensus) startViewChange() { if err := consensus.vc.InitPayload( consensus.FBFTLog, nextViewID, - consensus.getBlockNum(), + consensus.BlockNum(), consensus.priKey, members); err != nil { consensus.getLogger().Error().Err(err).Msg("[startViewChange] Init Payload Error") @@ -285,12 +311,14 @@ func (consensus *Consensus) startViewChange() { // for view change, send separate view change per public key // do not do multi-sign of view change message for _, key := range consensus.priKey { - if !consensus.isValidatorInCommittee(key.Pub.Bytes) { + if !consensus.IsValidatorInCommittee(key.Pub.Bytes) { continue } + // Тут уже другой leader msgToSend := consensus.constructViewChangeMessage(&key) + fmt.Println("Message to send leader222: ", consensus.LeaderPubKey.Bytes.Hex()) if err := consensus.msgSender.SendWithRetry( - consensus.getBlockNum(), + consensus.BlockNum(), msg_pb.MessageType_VIEWCHANGE, []nodeconfig.GroupID{ nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, @@ -304,7 +332,7 @@ func (consensus *Consensus) startViewChange() { // startNewView stops the current view change func (consensus *Consensus) startNewView(viewID uint64, newLeaderPriKey *bls.PrivateKeyWrapper, reset bool) error { - if !consensus.isViewChangingMode() { + if !consensus.IsViewChangingMode() { return errors.New("not in view changing mode anymore") } @@ -316,7 +344,7 @@ func (consensus *Consensus) startNewView(viewID uint64, newLeaderPriKey *bls.Pri } if err := consensus.msgSender.SendWithRetry( - consensus.getBlockNum(), + consensus.BlockNum(), msg_pb.MessageType_NEWVIEW, []nodeconfig.GroupID{ nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(consensus.ShardID))}, @@ -333,8 +361,8 @@ func (consensus *Consensus) startNewView(viewID uint64, newLeaderPriKey *bls.Pri consensus.current.SetMode(Normal) consensus.consensusTimeout[timeoutViewChange].Stop() - consensus.setViewIDs(viewID) - consensus.resetViewChangeState() + consensus.SetViewIDs(viewID) + consensus.ResetViewChangeState() consensus.consensusTimeout[timeoutConsensus].Start() consensus.getLogger().Info(). @@ -344,15 +372,20 @@ func (consensus *Consensus) startNewView(viewID uint64, newLeaderPriKey *bls.Pri // TODO: consider make ResetState unified and only called in one place like finalizeCommit() if reset { - consensus.resetState() + consensus.ResetState() } - consensus.setLeaderPubKey(newLeaderPriKey.Pub) + fmt.Println("[startNewView]", newLeaderPriKey.Pub.Bytes.Hex()) + consensus.LeaderPubKey = newLeaderPriKey.Pub return nil } // onViewChange is called when the view change message is received. func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { + //fmt.Printf("[onViewChange] received view change message from %+v\n", recvMsg) + consensus.mutex.Lock() + defer consensus.mutex.Unlock() + consensus.getLogger().Debug(). Uint64("viewID", recvMsg.ViewID). Uint64("blockNum", recvMsg.BlockNum). @@ -361,7 +394,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { // if not leader, noop newLeaderKey := recvMsg.LeaderPubkey - newLeaderPriKey, err := consensus.getLeaderPrivateKey(newLeaderKey.Object) + newLeaderPriKey, err := consensus.GetLeaderPrivateKey(newLeaderKey.Object) if err != nil { consensus.getLogger().Debug(). Err(err). @@ -372,6 +405,13 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { return } + consensus.getLogger().Debug(). + Err(err). + Interface("SenderPubkeys", recvMsg.SenderPubkeys). + Str("NextLeader", recvMsg.LeaderPubkey.Bytes.Hex()). + Str("myBLSPubKey", consensus.priKey.GetPublicKeys().SerializeToHexStr()). + Msg("[onViewChange] I am the Leader") + if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) { consensus.getLogger().Info(). Int64("have", consensus.Decider.SignersCount(quorum.ViewChange)). @@ -414,7 +454,7 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { } // received enough view change messages, change state to normal consensus - if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) && consensus.isViewChangingMode() { + if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) && consensus.IsViewChangingMode() { // no previous prepared message, go straight to normal mode // and start proposing new block if consensus.vc.IsM1PayloadEmpty() { @@ -449,6 +489,11 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { // Or the validator will enter announce phase to wait for the new block proposed // from the new leader func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { + consensus.mutex.Lock() + defer consensus.mutex.Unlock() + + fmt.Printf("[onNewView] received new view message from %+v\n", recvMsg) + consensus.getLogger().Info(). Uint64("viewID", recvMsg.ViewID). Uint64("blockNum", recvMsg.BlockNum). @@ -456,10 +501,10 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { Msg("[onNewView] Received NewView Message") // change view and leaderKey to keep in sync with network - if consensus.getBlockNum() != recvMsg.BlockNum { + if consensus.BlockNum() != recvMsg.BlockNum { consensus.getLogger().Warn(). Uint64("MsgBlockNum", recvMsg.BlockNum). - Uint64("myBlockNum", consensus.getBlockNum()). + Uint64("myBlockNum", consensus.BlockNum()). Msg("[onNewView] Invalid block number") return } @@ -493,7 +538,7 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { utils.CountOneBits(m3Mask.Bitmap) > utils.CountOneBits(m2Mask.Bitmap)) { // m1 is not empty, check it's valid blockHash := recvMsg.Payload[:32] - aggSig, mask, err := consensus.readSignatureBitmapPayload(recvMsg.Payload, 32, consensus.Decider.Participants()) + aggSig, mask, err := consensus.ReadSignatureBitmapPayload(recvMsg.Payload, 32) if err != nil { consensus.getLogger().Error().Err(err). Msg("[onNewView] ReadSignatureBitmapPayload Failed") @@ -527,7 +572,7 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { } } - if !consensus.isViewChangingMode() { + if !consensus.IsViewChangingMode() { consensus.getLogger().Info().Msg("Not in ViewChanging Mode.") return } @@ -535,9 +580,12 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { consensus.consensusTimeout[timeoutViewChange].Stop() // newView message verified success, override my state - consensus.setViewIDs(recvMsg.ViewID) + consensus.SetViewIDs(recvMsg.ViewID) + consensus.pubKeyLock.Lock() + fmt.Println("[onNewView1221] new leader key cur:", consensus.LeaderPubKey.Bytes.Hex(), " new: ", senderKey.Bytes.Hex()) consensus.LeaderPubKey = senderKey - consensus.resetViewChangeState() + consensus.pubKeyLock.Unlock() + consensus.ResetViewChangeState() consensus.msgSender.StopRetry(msg_pb.MessageType_VIEWCHANGE) @@ -546,7 +594,7 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { consensus.sendCommitMessages(preparedBlock) consensus.switchPhase("onNewView", FBFTCommit) } else { - consensus.resetState() + consensus.ResetState() consensus.getLogger().Info().Msg("onNewView === announce") } consensus.getLogger().Info(). @@ -558,13 +606,6 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { // ResetViewChangeState resets the view change structure func (consensus *Consensus) ResetViewChangeState() { - consensus.mutex.Lock() - defer consensus.mutex.Unlock() - consensus.resetViewChangeState() -} - -// ResetViewChangeState resets the view change structure -func (consensus *Consensus) resetViewChangeState() { consensus.getLogger().Info(). Str("Phase", consensus.phase.String()). Msg("[ResetViewChangeState] Resetting view change state") diff --git a/hmy/hmy.go b/hmy/hmy.go index 58e8d59ed..8089f04ba 100644 --- a/hmy/hmy.go +++ b/hmy/hmy.go @@ -18,6 +18,7 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/vm" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/multibls" commonRPC "github.com/harmony-one/harmony/rpc/common" "github.com/harmony-one/harmony/shard" staking "github.com/harmony-one/harmony/staking/types" @@ -96,6 +97,7 @@ type NodeAPI interface { GetStakingTransactionsCount(address, txType string) (uint64, error) GetTraceResultByHash(hash common.Hash) (json.RawMessage, error) IsCurrentlyLeader() bool + GetPublicKeys() multibls.PublicKeys IsOutOfSync(shardID uint32) bool SyncStatus(shardID uint32) (bool, uint64, uint64) SyncPeers() map[string]int diff --git a/internal/utils/singleton.go b/internal/utils/singleton.go index 10101d767..1b71e981b 100644 --- a/internal/utils/singleton.go +++ b/internal/utils/singleton.go @@ -197,6 +197,7 @@ func updateZeroLogLevel(level int) { zeroLogger = &childLogger } + // GetPort is useful for debugging, returns `--port` flag provided to executable. func GetPort() int { ok := false diff --git a/node/api.go b/node/api.go index ceda96808..debcb201f 100644 --- a/node/api.go +++ b/node/api.go @@ -6,6 +6,7 @@ import ( "github.com/harmony-one/harmony/eth/rpc" "github.com/harmony-one/harmony/hmy" "github.com/harmony-one/harmony/internal/tikv" + "github.com/harmony-one/harmony/multibls" "github.com/harmony-one/harmony/rosetta" hmy_rpc "github.com/harmony-one/harmony/rpc" rpc_common "github.com/harmony-one/harmony/rpc/common" @@ -18,6 +19,11 @@ func (node *Node) IsCurrentlyLeader() bool { return node.Consensus.IsLeader() } +// GetPublicKeys exposes if node is currently the leader node +func (node *Node) GetPublicKeys() multibls.PublicKeys { + return node.Consensus.GetPrivateKeys().GetPublicKeys() +} + // PeerConnectivity .. func (node *Node) PeerConnectivity() (int, int, int) { return node.host.PeerConnectivity() diff --git a/node/node_newblock.go b/node/node_newblock.go index 5050e4d6a..cbc4ad05f 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -2,6 +2,7 @@ package node import ( "errors" + "fmt" "sort" "strings" "time" @@ -88,6 +89,8 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalTyp newBlock, err := node.ProposeNewBlock(newCommitSigsChan) if err == nil { + fmt.Printf("ProposeNewBlock: #%d :%d @%d with leader %s\n", newBlock.NumberU64(), utils.GetPort(), newBlock.Header().ViewID().Int64(), node.Consensus.GetLeaderPubKey().Bytes.Hex()) + if blk, ok := node.proposedBlock[newBlock.NumberU64()]; ok { utils.Logger().Info().Uint64("blockNum", newBlock.NumberU64()).Str("blockHash", blk.Hash().Hex()). Msg("Block with the same number was already proposed, abort.") @@ -145,6 +148,7 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error) if node.Blockchain().Config().IsStaking(header.Epoch()) { blsPubKeyBytes := leaderKey.Object.GetAddress() coinbase.SetBytes(blsPubKeyBytes[:]) + fmt.Println("coinbase.SetBytes leader: ", leaderKey.Bytes.Hex(), coinbase.Hex()) } emptyAddr := common.Address{} diff --git a/test/configs/local-resharding.txt b/test/configs/local-resharding.txt index 4ce91ece1..fc3d3e97b 100644 --- a/test/configs/local-resharding.txt +++ b/test/configs/local-resharding.txt @@ -1,25 +1,12 @@ 127.0.0.1 9000 validator .hmy/65f55eb3052f9e9f632b2923be594ba77c55543f5c58ee1454b9cfd658d25e06373b0f7d42a19c84768139ea294f6204.key -127.0.0.1 9002 validator .hmy/40379eed79ed82bebfb4310894fd33b6a3f8413a78dc4d43b98d0adc9ef69f3285df05eaab9f2ce5f7227f8cb920e809.key -127.0.0.1 9004 validator .hmy/02c8ff0b88f313717bc3a627d2f8bb172ba3ad3bb9ba3ecb8eed4b7c878653d3d4faf769876c528b73f343967f74a917.key -127.0.0.1 9006 validator .hmy/ee2474f93cba9241562efc7475ac2721ab0899edf8f7f115a656c0c1f9ef8203add678064878d174bb478fa2e6630502.key -127.0.0.1 9008 validator .hmy/e751ec995defe4931273aaebcb2cd14bf37e629c554a57d3f334c37881a34a6188a93e76113c55ef3481da23b7d7ab09.key -127.0.0.1 9010 validator .hmy/776f3b8704f4e1092a302a60e84f81e476c212d6f458092b696df420ea19ff84a6179e8e23d090b9297dc041600bc100.key -127.0.0.1 9012 validator .hmy/2d61379e44a772e5757e27ee2b3874254f56073e6bd226eb8b160371cc3c18b8c4977bd3dcb71fd57dc62bf0e143fd08.key -127.0.0.1 9014 validator .hmy/c4e4708b6cf2a2ceeb59981677e9821eebafc5cf483fb5364a28fa604cc0ce69beeed40f3f03815c9e196fdaec5f1097.key -127.0.0.1 9016 validator .hmy/86dc2fdc2ceec18f6923b99fd86a68405c132e1005cf1df72dca75db0adfaeb53d201d66af37916d61f079f34f21fb96.key -127.0.0.1 9018 validator .hmy/49d15743b36334399f9985feb0753430a2b287b2d68b84495bbb15381854cbf01bca9d1d9f4c9c8f18509b2bfa6bd40f.key -127.0.0.1 9020 validator .hmy/95117937cd8c09acd2dfae847d74041a67834ea88662a7cbed1e170350bc329e53db151e5a0ef3e712e35287ae954818.key -127.0.0.1 9022 validator .hmy/68ae289d73332872ec8d04ac256ca0f5453c88ad392730c5741b6055bc3ec3d086ab03637713a29f459177aaa8340615.key -127.0.0.1 9200 explorer null 0 +127.0.0.1 9002 validator .hmy/02c8ff0b88f313717bc3a627d2f8bb172ba3ad3bb9ba3ecb8eed4b7c878653d3d4faf769876c528b73f343967f74a917.key +127.0.0.1 9004 validator .hmy/e751ec995defe4931273aaebcb2cd14bf37e629c554a57d3f334c37881a34a6188a93e76113c55ef3481da23b7d7ab09.key +127.0.0.1 9006 validator .hmy/2d61379e44a772e5757e27ee2b3874254f56073e6bd226eb8b160371cc3c18b8c4977bd3dcb71fd57dc62bf0e143fd08.key +127.0.0.1 9008 validator .hmy/86dc2fdc2ceec18f6923b99fd86a68405c132e1005cf1df72dca75db0adfaeb53d201d66af37916d61f079f34f21fb96.key +127.0.0.1 9010 validator .hmy/95117937cd8c09acd2dfae847d74041a67834ea88662a7cbed1e170350bc329e53db151e5a0ef3e712e35287ae954818.key +127.0.0.1 9099 explorer null 0 127.0.0.1 9100 validator .hmy/52ecce5f64db21cbe374c9268188f5d2cdd5bec1a3112276a350349860e35fb81f8cfe447a311e0550d961cf25cb988d.key -127.0.0.1 9102 validator .hmy/a547a9bf6fdde4f4934cde21473748861a3cc0fe8bbb5e57225a29f483b05b72531f002f8187675743d819c955a86100.key -127.0.0.1 9104 validator .hmy/678ec9670899bf6af85b877058bea4fc1301a5a3a376987e826e3ca150b80e3eaadffedad0fedfa111576fa76ded980c.key -127.0.0.1 9106 validator .hmy/63f479f249c59f0486fda8caa2ffb247209489dae009dfde6144ff38c370230963d360dffd318cfb26c213320e89a512.key -127.0.0.1 9108 validator .hmy/16513c487a6bb76f37219f3c2927a4f281f9dd3fd6ed2e3a64e500de6545cf391dd973cc228d24f9bd01efe94912e714.key -127.0.0.1 9110 validator .hmy/576d3c48294e00d6be4a22b07b66a870ddee03052fe48a5abbd180222e5d5a1f8946a78d55b025de21635fd743bbad90.key -127.0.0.1 9112 validator .hmy/eca09c1808b729ca56f1b5a6a287c6e1c3ae09e29ccf7efa35453471fcab07d9f73cee249e2b91f5ee44eb9618be3904.key -127.0.0.1 9114 validator .hmy/f47238daef97d60deedbde5302d05dea5de67608f11f406576e363661f7dcbc4a1385948549b31a6c70f6fde8a391486.key -127.0.0.1 9116 validator .hmy/fc4b9c535ee91f015efff3f32fbb9d32cdd9bfc8a837bb3eee89b8fff653c7af2050a4e147ebe5c7233dc2d5df06ee0a.key -127.0.0.1 9118 validator .hmy/ca86e551ee42adaaa6477322d7db869d3e203c00d7b86c82ebee629ad79cb6d57b8f3db28336778ec2180e56a8e07296.key -127.0.0.1 9300 explorer null 1 \ No newline at end of file +127.0.0.1 9102 validator .hmy/678ec9670899bf6af85b877058bea4fc1301a5a3a376987e826e3ca150b80e3eaadffedad0fedfa111576fa76ded980c.key +127.0.0.1 9104 validator .hmy/16513c487a6bb76f37219f3c2927a4f281f9dd3fd6ed2e3a64e500de6545cf391dd973cc228d24f9bd01efe94912e714.key +127.0.0.1 9106 validator .hmy/eca09c1808b729ca56f1b5a6a287c6e1c3ae09e29ccf7efa35453471fcab07d9f73cee249e2b91f5ee44eb9618be3904.key \ No newline at end of file diff --git a/test/deploy.sh b/test/deploy.sh index 9ab694311..fe22de57f 100755 --- a/test/deploy.sh +++ b/test/deploy.sh @@ -69,7 +69,7 @@ function launch_localnet() { if ${VERBOSE}; then verbosity=5 else - verbosity=3 + verbosity=5 fi base_args=(--log_folder "${log_folder}" --min_peers "${MIN}" --bootnodes "${BN_MA}" "--network_type=$NETWORK" --blspass file:"${ROOT}/.hmy/blspass.txt" "--dns=false" "--verbosity=${verbosity}" "--p2p.security.max-conn-per-ip=100") @@ -80,8 +80,11 @@ function launch_localnet() { while IFS='' read -r line || [[ -n "$line" ]]; do i=$((i + 1)) + + # Read config for i-th node form config file IFS=' ' read -r ip port mode bls_key shard node_config <<<"${line}" + echo "LINE: ${line} ${shard}" args=("${base_args[@]}" --ip "${ip}" --port "${port}" --key "/tmp/${ip}-${port}.key" --db_dir "${ROOT}/db-${ip}-${port}" "--broadcast_invalid_tx=false") if [[ -z "$ip" || -z "$port" ]]; then echo "skip empty node"