diff --git a/api/service/blockproposal/service.go b/api/service/blockproposal/service.go index 4d42c558a..1cbb5accf 100644 --- a/api/service/blockproposal/service.go +++ b/api/service/blockproposal/service.go @@ -18,19 +18,22 @@ type Service struct { // New returns a block proposal service. func New(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, waitForConsensusReady func(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, stopChan chan struct{}, stoppedChan chan struct{})) *Service { - return &Service{readySignal: readySignal, commitSigsChan: commitSigsChan, waitForConsensusReady: waitForConsensusReady} + return &Service{ + readySignal: readySignal, + commitSigsChan: commitSigsChan, + waitForConsensusReady: waitForConsensusReady, + stopChan: make(chan struct{}), + stoppedChan: make(chan struct{}), + } } // Start starts block proposal service. func (s *Service) Start() error { - s.stopChan = make(chan struct{}) - s.stoppedChan = make(chan struct{}) - - s.run(s.stopChan, s.stoppedChan) + s.run() return nil } -func (s *Service) run(stopChan chan struct{}, stoppedChan chan struct{}) { +func (s *Service) run() { s.waitForConsensusReady(s.readySignal, s.commitSigsChan, s.stopChan, s.stoppedChan) } diff --git a/api/service/explorer/service.go b/api/service/explorer/service.go index b5ffa1b8b..d96c1bc56 100644 --- a/api/service/explorer/service.go +++ b/api/service/explorer/service.go @@ -7,11 +7,13 @@ import ( "fmt" "net" "net/http" + "os" "path" "strconv" "time" "github.com/RoaringBitmap/roaring/roaring64" + "github.com/harmony-one/harmony/internal/common" harmonyconfig "github.com/harmony-one/harmony/internal/configs/harmony" ethCommon "github.com/ethereum/go-ethereum/common" @@ -113,12 +115,17 @@ func (s *Service) Run() *http.Server { s.router = mux.NewRouter() + fmt.Println("++", addr) + // Set up router for addresses. // Fetch addresses request, accepts parameter size: how much addresses to read, // parameter prefix: from which address prefix start s.router.Path("/addresses").Queries("size", "{[0-9]*?}", "prefix", "{[a-zA-Z0-9]*?}").HandlerFunc(s.GetAddresses).Methods("GET") s.router.Path("/addresses").HandlerFunc(s.GetAddresses) s.router.Path("/height").HandlerFunc(s.GetHeight) + s.router.Path("/leader").HandlerFunc(s.GetLeader) + s.router.Path("/blocks").HandlerFunc(s.GetBlocks) + s.router.Path("/halt").HandlerFunc(s.halt) // Set up router for supply info s.router.Path("/burn-addresses").Queries().HandlerFunc(s.GetInaccessibleAddressInfo).Methods("GET") @@ -186,6 +193,47 @@ type HeightResponse struct { S3 uint64 `json:"3,omitempty"` } +func (s *Service) GetLeader(w http.ResponseWriter, r *http.Request) { + if s.backend.IsCurrentlyLeader() { + w.Write([]byte("true ")) + } else { + w.Write([]byte("false")) + } + + keys := "" + for _, p := range s.backend.GetPublicKeys() { + addr := common.Address{} + addrBytes := p.Object.GetAddress() + addr.SetBytes(addrBytes[:]) + keys += fmt.Sprintf("%s ", addr.String()) + break + } + //blsPubKeyBytes := leaderKey.Object.GetAddress() + //coinbase.SetBytes(blsPubKeyBytes[:]) + + w.Write([]byte(fmt.Sprintf(" %d", s.blockchain.ShardID()))) + w.Write([]byte(fmt.Sprintf(" %s", s.Port))) + w.Write([]byte(fmt.Sprintf(" %s", keys))) + w.Write([]byte(fmt.Sprintf(" %s", s.backend.GetPublicKeys().SerializeToHexStr()))) + +} + +func (s *Service) GetBlocks(w http.ResponseWriter, r *http.Request) { + cur := s.blockchain.CurrentHeader().Number().Uint64() + + for i := cur; i > 0; i-- { + block := s.blockchain.GetBlockByNumber(i) + w.Write([]byte(fmt.Sprintf("%d ", i))) + w.Write([]byte(fmt.Sprintf("%s ", block.Header().ViewID().String()))) + w.Write([]byte(fmt.Sprintf("%s ", block.Header().Coinbase().Hash().Hex()))) + w.Write([]byte(fmt.Sprintf("%s\n", block.Header().Coinbase().Hex()))) + } +} + +func (s *Service) halt(w http.ResponseWriter, r *http.Request) { + os.Exit(0) +} + // GetHeight returns heights of current and beacon chains if needed. func (s *Service) GetHeight(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 437a91dc5..75eabfb2e 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -265,6 +265,8 @@ func setupNodeLog(config harmonyconfig.HarmonyConfig) { func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) { var err error + fmt.Println("OS: ", os.Args) + nodeconfigSetShardSchedule(hc) nodeconfig.SetShardingSchedule(shard.Schedule) nodeconfig.SetVersion(getHarmonyVersion()) @@ -420,8 +422,9 @@ func setupNodeAndRun(hc harmonyconfig.HarmonyConfig) { if currentNode.NodeConfig.Role() == nodeconfig.Validator { currentNode.RegisterValidatorServices() } else if currentNode.NodeConfig.Role() == nodeconfig.ExplorerNode { - currentNode.RegisterExplorerServices() + } + currentNode.RegisterExplorerServices() currentNode.RegisterService(service.CrosslinkSending, crosslink_sending.New(currentNode, currentNode.Blockchain())) if hc.Pprof.Enabled { setupPprofService(currentNode, hc) @@ -784,6 +787,7 @@ func setupConsensusAndNode(hc harmonyconfig.HarmonyConfig, nodeConfig *nodeconfi // Set the consensus ID to be the current block number viewID := currentNode.Blockchain().CurrentBlock().Header().ViewID().Uint64() + fmt.Println("viewID:", viewID) currentConsensus.SetViewIDs(viewID + 1) utils.Logger().Info(). Uint64("viewID", viewID). diff --git a/consensus/checks.go b/consensus/checks.go index ceaf9987b..c44a58da3 100644 --- a/consensus/checks.go +++ b/consensus/checks.go @@ -55,8 +55,7 @@ func (consensus *Consensus) senderKeySanityChecks(msg *msg_pb.Message, senderKey return true } -func (consensus *Consensus) isRightBlockNumAndViewID(recvMsg *FBFTMessage, -) bool { +func (consensus *Consensus) isRightBlockNumAndViewID(recvMsg *FBFTMessage) bool { if recvMsg.ViewID != consensus.GetCurBlockViewID() || recvMsg.BlockNum != consensus.BlockNum() { consensus.getLogger().Debug(). Uint64("blockNum", consensus.BlockNum()). diff --git a/consensus/consensus.go b/consensus/consensus.go index 89897e372..259801010 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -73,6 +73,8 @@ type Consensus struct { priKey multibls.PrivateKeys // the publickey of leader LeaderPubKey *bls.PublicKeyWrapper + // index of leader in the list of validators. + LeaderIndex int // blockNum: the next blockNumber that FBFT is going to agree on, // should be equal to the blockNumber of next block blockNum uint64 @@ -220,7 +222,9 @@ func New( registry *registry.Registry, Decider quorum.Decider, minPeers int, aggregateSig bool, ) (*Consensus, error) { - consensus := Consensus{} + consensus := Consensus{ + ShardID: shard, + } consensus.Decider = Decider consensus.registry = registry consensus.MinPeers = minPeers diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index 310c9bb9d..e646504dd 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -1,6 +1,7 @@ package consensus import ( + "fmt" "math/big" "sync/atomic" "time" @@ -106,11 +107,6 @@ func (consensus *Consensus) UpdatePublicKeys(pubKeys, allowlist []bls_cosi.Publi return consensus.Decider.ParticipantsCount() } -// NewFaker returns a faker consensus. -func NewFaker() *Consensus { - return &Consensus{} -} - // Sign on the hash of the message func (consensus *Consensus) signMessage(message []byte, priKey *bls_core.SecretKey) []byte { hash := hash.Keccak256(message) @@ -217,6 +213,7 @@ func (consensus *Consensus) checkViewID(msg *FBFTMessage) error { if !msg.HasSingleSender() { return errors.New("Leader message can not have multiple sender keys") } + fmt.Println("[checkViewID] Set LEADEER PUB KEY ", msg.SenderPubkeys[0].Bytes.Hex(), utils.GetPort()) consensus.LeaderPubKey = msg.SenderPubkeys[0] consensus.IgnoreViewIDCheck.UnSet() consensus.consensusTimeout[timeoutConsensus].Start() @@ -400,6 +397,7 @@ func (consensus *Consensus) UpdateConsensusInformation() Mode { Str("leaderPubKey", leaderPubKey.Bytes.Hex()). Msg("[UpdateConsensusInformation] Most Recent LeaderPubKey Updated Based on BlockChain") consensus.pubKeyLock.Lock() + fmt.Println("[UpdateConsensusInformation] Most Recent LeaderPubKey Updated Based on BlockChain", leaderPubKey.Bytes.Hex(), utils.GetPort()) consensus.LeaderPubKey = leaderPubKey consensus.pubKeyLock.Unlock() } @@ -451,16 +449,43 @@ func (consensus *Consensus) IsLeader() bool { return false } +// isLeader check if the node is a leader or not by comparing the public key of +// the node with the leader public key. This function assume it runs under lock. +func (consensus *Consensus) isLeader() bool { + obj := consensus.LeaderPubKey.Object + for _, key := range consensus.priKey { + if key.Pub.Object.IsEqual(obj) { + return true + } + } + return false +} + // SetViewIDs set both current view ID and view changing ID to the height // of the blockchain. It is used during client startup to recover the state func (consensus *Consensus) SetViewIDs(height uint64) { + fmt.Println("SetViewIDs", height) consensus.SetCurBlockViewID(height) consensus.SetViewChangingID(height) } // SetCurBlockViewID set the current view ID -func (consensus *Consensus) SetCurBlockViewID(viewID uint64) { - consensus.current.SetCurBlockViewID(viewID) +func (consensus *Consensus) SetCurBlockViewID(viewID uint64) uint64 { + return consensus.current.SetCurBlockViewID(viewID) +} + +// SetLeaderIndex set the leader index. +func (consensus *Consensus) SetLeaderIndex(f func(int) int) (current int) { + consensus.pubKeyLock.Lock() + defer consensus.pubKeyLock.Unlock() + consensus.LeaderIndex = f(consensus.LeaderIndex) + return consensus.LeaderIndex +} + +func (consensus *Consensus) GetLeaderIndex() int { + consensus.pubKeyLock.Lock() + defer consensus.pubKeyLock.Unlock() + return consensus.LeaderIndex } // SetViewChangingID set the current view change ID @@ -473,6 +498,15 @@ func (consensus *Consensus) StartFinalityCount() { consensus.finalityCounter.Store(time.Now().UnixNano()) } +//func (consensus *Consensus) ReshardingNextLeader(newblock *types.Block) { +// consensus.pubKeyLock.Lock() +// fmt.Println("nextBlock1 ", newblock.Header().Number().Uint64(), " ", consensus.LeaderPubKey.Bytes.Hex()) +// consensus.LeaderPubKey = consensus.getNextLeaderKey(consensus.GetCurBlockViewID() + 1) +// fmt.Println("nextBlock2 ", newblock.Header().Number().Uint64(), " ", consensus.LeaderPubKey.Bytes.Hex()) +// consensus.pubKeyLock.Unlock() +// +//} + // FinishFinalityCount calculate the current finality func (consensus *Consensus) FinishFinalityCount() { d := time.Now().UnixNano() diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index deb0883d9..16554d039 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/hex" + "fmt" "sync/atomic" "time" @@ -130,6 +131,7 @@ func (consensus *Consensus) HandleMessageUpdate(ctx context.Context, msg *msg_pb } func (consensus *Consensus) finalCommit() { + // THIS IS NOT GOOD PLACE FOR LEADER SWITCHING numCommits := consensus.Decider.SignersCount(quorum.Commit) consensus.getLogger().Info(). @@ -392,6 +394,7 @@ func (consensus *Consensus) Start( consensusSyncCounterVec.With(prometheus.Labels{"consensus": "out_of_sync"}).Inc() case newBlock := <-blockChannel: + //consensus.ReshardingNextLeader(newBlock) consensus.getLogger().Info(). Uint64("MsgBlockNum", newBlock.NumberU64()). Msg("[ConsensusMainLoop] Received Proposed New Block!") @@ -403,6 +406,7 @@ func (consensus *Consensus) Start( } // Sleep to wait for the full block time consensus.getLogger().Info().Msg("[ConsensusMainLoop] Waiting for Block Time") + <-time.After(time.Until(consensus.NextBlockDue)) consensus.StartFinalityCount() @@ -431,7 +435,7 @@ func (consensus *Consensus) Start( } } -// Close close the consensus. If current is in normal commit phase, wait until the commit +// Close closes the consensus. If current is in normal commit phase, wait until the commit // phase end. func (consensus *Consensus) Close() error { if consensus.dHelper != nil { @@ -527,6 +531,7 @@ func (consensus *Consensus) getLastMileBlocksAndMsg(bnStart uint64) ([]*types.Bl // preCommitAndPropose commit the current block with 67% commit signatures and start // proposing new block which will wait on the full commit signatures to finish func (consensus *Consensus) preCommitAndPropose(blk *types.Block) error { + //fmt.Println("preCommitAndPropose", utils.GetPort(), blk.NumberU64()) if blk == nil { return errors.New("block to pre-commit is nil") } @@ -644,6 +649,7 @@ func (consensus *Consensus) tryCatchup() error { consensus.getLogger().Error().Err(err).Msg("[TryCatchup] Failed to add block to chain") return err } + //fmt.Println("tryCatchup ", utils.GetPort(), blk.NumberU64()) select { // TODO: Remove this when removing dns sync and stream sync is fully up case consensus.VerifiedNewBlock <- blk: @@ -658,6 +664,8 @@ func (consensus *Consensus) tryCatchup() error { } func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMessage) error { + // this function evaluates for all, leader and validators. + if consensus.Blockchain().CurrentBlock().NumberU64() < blk.NumberU64() { if _, err := consensus.Blockchain().InsertChain([]*types.Block{blk}, !consensus.FBFTLog.IsBlockVerified(blk.Hash())); err != nil { consensus.getLogger().Error().Err(err).Msg("[commitBlock] Failed to add block to chain") @@ -682,10 +690,29 @@ func (consensus *Consensus) commitBlock(blk *types.Block, committedMsg *FBFTMess // SetupForNewConsensus sets the state for new consensus func (consensus *Consensus) SetupForNewConsensus(blk *types.Block, committedMsg *FBFTMessage) { atomic.StoreUint64(&consensus.blockNum, blk.NumberU64()+1) - consensus.SetCurBlockViewID(committedMsg.ViewID + 1) + curBlockViewID := consensus.SetCurBlockViewID(committedMsg.ViewID + 1) // first view id is going to be 2. + prev := consensus.GetLeaderPubKey() + idx := consensus.SetLeaderIndex(func(i int) int { + if curBlockViewID%3 == 0 { + return i + 1 + } + return i + }) + pps := consensus.Decider.Participants() consensus.pubKeyLock.Lock() - consensus.LeaderPubKey = committedMsg.SenderPubkeys[0] + consensus.LeaderPubKey = &pps[idx%len(pps)] + fmt.Printf("SetupForNewConsensus :%d idx: %d future v%d new: %s prev: %s %q\n", utils.GetPort(), idx, curBlockViewID, consensus.LeaderPubKey.Bytes.Hex(), prev.Bytes.Hex(), consensus.isLeader()) consensus.pubKeyLock.Unlock() + if consensus.IsLeader() && !consensus.GetLeaderPubKey().Object.IsEqual(prev.Object) { + // leader changed + go func() { + fmt.Printf("ReadySignal :%d for leader %s\n", utils.GetPort(), consensus.GetLeaderPubKey().Bytes.Hex()) + defer fmt.Printf("Defer ReadySignal :%d for leader %s\n", utils.GetPort(), consensus.GetLeaderPubKey().Bytes.Hex()) + consensus.ReadySignal <- SyncProposal + + }() + + } // Update consensus keys at last so the change of leader status doesn't mess up normal flow if blk.IsLastBlockInEpoch() { consensus.SetMode(consensus.UpdateConsensusInformation()) diff --git a/consensus/leader.go b/consensus/leader.go index 477d8eb29..a359c229a 100644 --- a/consensus/leader.go +++ b/consensus/leader.go @@ -3,12 +3,11 @@ package consensus import ( "time" + "github.com/harmony-one/harmony/consensus/signature" "github.com/harmony-one/harmony/crypto/bls" "github.com/harmony-one/harmony/internal/common" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" - "github.com/harmony-one/harmony/consensus/signature" - "github.com/ethereum/go-ethereum/rlp" bls_core "github.com/harmony-one/bls/ffi/go/bls" msg_pb "github.com/harmony-one/harmony/api/proto/message" @@ -200,9 +199,17 @@ func (consensus *Consensus) onPrepare(recvMsg *FBFTMessage) { } func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { + // TODO HERE + //if recvMsg.ViewID == 10 { + // return + //} consensus.mutex.Lock() defer consensus.mutex.Unlock() //// Read - Start + if consensus.ShardID == 0 { + //fmt.Println("onCommit ", recvMsg.BlockNum) + } + if !consensus.isRightBlockNumAndViewID(recvMsg) { return } @@ -334,4 +341,5 @@ func (consensus *Consensus) onCommit(recvMsg *FBFTMessage) { consensus.msgSender.StopRetry(msg_pb.MessageType_PREPARED) } + //fmt.Println("onCommit99: ", utils.GetPort(), recvMsg.BlockNum) } diff --git a/consensus/quorum/quorum.go b/consensus/quorum/quorum.go index 867fd9967..7e76dfcc6 100644 --- a/consensus/quorum/quorum.go +++ b/consensus/quorum/quorum.go @@ -231,12 +231,17 @@ func (s *cIdentities) NthNextHmy(instance shardingconfig.Instance, pubKey *bls.P Msg("[NthNextHmy] pubKey not found") } numNodes := instance.NumHarmonyOperatedNodesPerShard() + //fmt.Println("??idx:", idx, numNodes) // sanity check to avoid out of bound access if numNodes <= 0 || numNodes > len(s.publicKeys) { numNodes = len(s.publicKeys) } idx = (idx + next) % numNodes - return found, &s.publicKeys[idx] + //fmt.Println("-------idx:", idx) + + new := &s.publicKeys[idx] + fmt.Println("NthNextHmy: ", pubKey.Bytes.Hex(), new.Bytes.Hex()) + return found, new } // NthNextHmyExt return the Nth next pubkey of Harmony + allowlist nodes, next can be negative number diff --git a/consensus/validator.go b/consensus/validator.go index a73ac92eb..013769395 100644 --- a/consensus/validator.go +++ b/consensus/validator.go @@ -18,6 +18,7 @@ import ( ) func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { + recvMsg, err := consensus.ParseFBFTMessage(msg) if err != nil { consensus.getLogger().Error(). @@ -26,6 +27,9 @@ func (consensus *Consensus) onAnnounce(msg *msg_pb.Message) { Msg("[OnAnnounce] Unparseable leader message") return } + if consensus.ShardID == 0 { + //fmt.Println("onAnnounce called ", recvMsg.BlockNum) + } // NOTE let it handle its own logs if !consensus.onAnnounceSanityChecks(recvMsg) { @@ -184,6 +188,9 @@ func (consensus *Consensus) sendCommitMessages(blockObj *types.Block) { // if onPrepared accepts the prepared message from the leader, then // it will send a COMMIT message for the leader to receive on the network. func (consensus *Consensus) onPrepared(recvMsg *FBFTMessage) { + if consensus.ShardID == 0 { + //fmt.Println("onPrepared", recvMsg.BlockNum) + } consensus.mutex.Lock() defer consensus.mutex.Unlock() @@ -399,6 +406,13 @@ func (consensus *Consensus) onCommitted(recvMsg *FBFTMessage) { consensus.getLogger().Info().Msg("[OnCommitted] Start consensus timer (new block added)") consensus.consensusTimeout[timeoutConsensus].Start() } + + //fmt.Println("onCommitted", utils.GetPort(), recvMsg.BlockNum) + if blk != nil { + //consensus.ReshardingNextLeader(blk) + } else { + //fmt.Println("onCommitted", utils.GetPort(), recvMsg.BlockNum, "blk is nil") + } } // Collect private keys that are part of the current committee. diff --git a/consensus/view_change.go b/consensus/view_change.go index 5bfd49f83..2936fba01 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -1,6 +1,7 @@ package consensus import ( + "fmt" "math/big" "sync" "time" @@ -67,10 +68,11 @@ func (pm *State) GetCurBlockViewID() uint64 { } // SetCurBlockViewID sets the current view id -func (pm *State) SetCurBlockViewID(viewID uint64) { +func (pm *State) SetCurBlockViewID(viewID uint64) uint64 { pm.cViewMux.Lock() defer pm.cViewMux.Unlock() pm.blockViewID = viewID + return pm.blockViewID } // GetViewChangingID return the current view changing id @@ -160,6 +162,7 @@ func (consensus *Consensus) getNextViewID() (uint64, time.Duration) { Uint64("stuckBlockViewID", stuckBlockViewID). Msg("[getNextViewID]") + fmt.Println("end getNextViewID: ", nextViewID, viewChangeDuration) // duration is always the fixed view change duration for synchronous view change return nextViewID, viewChangeDuration } @@ -171,7 +174,8 @@ func (consensus *Consensus) getNextViewID() (uint64, time.Duration) { func (consensus *Consensus) getNextLeaderKey(viewID uint64) *bls.PublicKeyWrapper { gap := 1 - if viewID > consensus.GetCurBlockViewID() { + cur := consensus.GetCurBlockViewID() + if viewID > cur { gap = int(viewID - consensus.GetCurBlockViewID()) } var lastLeaderPubKey *bls.PublicKeyWrapper @@ -231,6 +235,7 @@ func (consensus *Consensus) getNextLeaderKey(viewID uint64) *bls.PublicKeyWrappe lastLeaderPubKey, gap) } + fmt.Println("wasfoundNext", consensus.Blockchain.Config().IsAllowlistEpoch(epoch), wasFound, next.Bytes.Hex(), lastLeaderPubKey.Bytes.Hex()) if !wasFound { consensus.getLogger().Warn(). Str("key", consensus.LeaderPubKey.Bytes.Hex()). @@ -252,6 +257,7 @@ func createTimeout() map[TimeoutType]*utils.Timeout { // startViewChange start the view change process func (consensus *Consensus) startViewChange() { + fmt.Printf("Message to send leader111: %d %s \n", utils.GetPort(), consensus.LeaderPubKey.Bytes.Hex()) if consensus.disableViewChange || consensus.IsBackup() { return } @@ -262,6 +268,7 @@ func (consensus *Consensus) startViewChange() { consensus.consensusTimeout[timeoutBootstrap].Stop() consensus.current.SetMode(ViewChanging) nextViewID, duration := consensus.getNextViewID() + //fmt.Println("startViewChange", nextViewID) consensus.SetViewChangingID(nextViewID) // TODO: set the Leader PubKey to the next leader for view change // this is dangerous as the leader change is not succeeded yet @@ -270,7 +277,10 @@ func (consensus *Consensus) startViewChange() { // Ideally, we shall use another variable to keep track of the // leader pubkey in viewchange mode consensus.pubKeyLock.Lock() - consensus.LeaderPubKey = consensus.getNextLeaderKey(nextViewID) + lpk := consensus.getNextLeaderKey(nextViewID) + consensus.LeaderPubKey = lpk + //fmt.Println("Message to send leader cur: ", consensus.LeaderPubKey.Bytes.Hex(), "next: ", lpk.Bytes.Hex()) + //fmt.Println("Message to send leader: ", consensus.LeaderPubKey.Bytes.Hex()) consensus.pubKeyLock.Unlock() consensus.getLogger().Warn(). @@ -304,7 +314,9 @@ func (consensus *Consensus) startViewChange() { if !consensus.IsValidatorInCommittee(key.Pub.Bytes) { continue } + // Тут уже другой leader msgToSend := consensus.constructViewChangeMessage(&key) + fmt.Println("Message to send leader222: ", consensus.LeaderPubKey.Bytes.Hex()) if err := consensus.msgSender.SendWithRetry( consensus.BlockNum(), msg_pb.MessageType_VIEWCHANGE, @@ -362,6 +374,7 @@ func (consensus *Consensus) startNewView(viewID uint64, newLeaderPriKey *bls.Pri if reset { consensus.ResetState() } + fmt.Println("[startNewView]", newLeaderPriKey.Pub.Bytes.Hex()) consensus.LeaderPubKey = newLeaderPriKey.Pub return nil @@ -369,6 +382,7 @@ func (consensus *Consensus) startNewView(viewID uint64, newLeaderPriKey *bls.Pri // onViewChange is called when the view change message is received. func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { + //fmt.Printf("[onViewChange] received view change message from %+v\n", recvMsg) consensus.mutex.Lock() defer consensus.mutex.Unlock() @@ -391,6 +405,13 @@ func (consensus *Consensus) onViewChange(recvMsg *FBFTMessage) { return } + consensus.getLogger().Debug(). + Err(err). + Interface("SenderPubkeys", recvMsg.SenderPubkeys). + Str("NextLeader", recvMsg.LeaderPubkey.Bytes.Hex()). + Str("myBLSPubKey", consensus.priKey.GetPublicKeys().SerializeToHexStr()). + Msg("[onViewChange] I am the Leader") + if consensus.Decider.IsQuorumAchievedByMask(consensus.vc.GetViewIDBitmap(recvMsg.ViewID)) { consensus.getLogger().Info(). Int64("have", consensus.Decider.SignersCount(quorum.ViewChange)). @@ -471,6 +492,8 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { consensus.mutex.Lock() defer consensus.mutex.Unlock() + fmt.Printf("[onNewView] received new view message from %+v\n", recvMsg) + consensus.getLogger().Info(). Uint64("viewID", recvMsg.ViewID). Uint64("blockNum", recvMsg.BlockNum). @@ -559,6 +582,7 @@ func (consensus *Consensus) onNewView(recvMsg *FBFTMessage) { // newView message verified success, override my state consensus.SetViewIDs(recvMsg.ViewID) consensus.pubKeyLock.Lock() + fmt.Println("[onNewView1221] new leader key cur:", consensus.LeaderPubKey.Bytes.Hex(), " new: ", senderKey.Bytes.Hex()) consensus.LeaderPubKey = senderKey consensus.pubKeyLock.Unlock() consensus.ResetViewChangeState() diff --git a/hmy/hmy.go b/hmy/hmy.go index 58e8d59ed..8089f04ba 100644 --- a/hmy/hmy.go +++ b/hmy/hmy.go @@ -18,6 +18,7 @@ import ( "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/vm" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/harmony-one/harmony/multibls" commonRPC "github.com/harmony-one/harmony/rpc/common" "github.com/harmony-one/harmony/shard" staking "github.com/harmony-one/harmony/staking/types" @@ -96,6 +97,7 @@ type NodeAPI interface { GetStakingTransactionsCount(address, txType string) (uint64, error) GetTraceResultByHash(hash common.Hash) (json.RawMessage, error) IsCurrentlyLeader() bool + GetPublicKeys() multibls.PublicKeys IsOutOfSync(shardID uint32) bool SyncStatus(shardID uint32) (bool, uint64, uint64) SyncPeers() map[string]int diff --git a/internal/utils/singleton.go b/internal/utils/singleton.go index 6409ea71e..7aee4d86b 100644 --- a/internal/utils/singleton.go +++ b/internal/utils/singleton.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "path" + "strconv" "sync" "time" @@ -195,3 +196,18 @@ func updateZeroLogLevel(level int) { childLogger := Logger().Level(zeroLoggerLevel) zeroLogger = &childLogger } + +func GetPort() int { + ok := false + for _, x := range os.Args { + if x == "--port" { + ok = true + continue + } + if ok { + rs, _ := strconv.ParseInt(x, 10, 64) + return int(rs) + } + } + return 0 +} diff --git a/node/api.go b/node/api.go index ceda96808..debcb201f 100644 --- a/node/api.go +++ b/node/api.go @@ -6,6 +6,7 @@ import ( "github.com/harmony-one/harmony/eth/rpc" "github.com/harmony-one/harmony/hmy" "github.com/harmony-one/harmony/internal/tikv" + "github.com/harmony-one/harmony/multibls" "github.com/harmony-one/harmony/rosetta" hmy_rpc "github.com/harmony-one/harmony/rpc" rpc_common "github.com/harmony-one/harmony/rpc/common" @@ -18,6 +19,11 @@ func (node *Node) IsCurrentlyLeader() bool { return node.Consensus.IsLeader() } +// GetPublicKeys exposes if node is currently the leader node +func (node *Node) GetPublicKeys() multibls.PublicKeys { + return node.Consensus.GetPrivateKeys().GetPublicKeys() +} + // PeerConnectivity .. func (node *Node) PeerConnectivity() (int, int, int) { return node.host.PeerConnectivity() diff --git a/node/node_newblock.go b/node/node_newblock.go index 03fd69d9d..24e26ad99 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -2,6 +2,7 @@ package node import ( "errors" + "fmt" "sort" "strings" "time" @@ -88,6 +89,8 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalTyp newBlock, err := node.ProposeNewBlock(newCommitSigsChan) if err == nil { + fmt.Printf("ProposeNewBlock: #%d :%d @%d with leader %s\n", newBlock.NumberU64(), utils.GetPort(), newBlock.Header().ViewID().Int64(), node.Consensus.GetLeaderPubKey().Bytes.Hex()) + if blk, ok := node.proposedBlock[newBlock.NumberU64()]; ok { utils.Logger().Info().Uint64("blockNum", newBlock.NumberU64()).Str("blockHash", blk.Hash().Hex()). Msg("Block with the same number was already proposed, abort.") @@ -145,6 +148,7 @@ func (node *Node) ProposeNewBlock(commitSigs chan []byte) (*types.Block, error) if node.Blockchain().Config().IsStaking(header.Epoch()) { blsPubKeyBytes := leaderKey.Object.GetAddress() coinbase.SetBytes(blsPubKeyBytes[:]) + fmt.Println("coinbase.SetBytes leader: ", leaderKey.Bytes.Hex(), coinbase.Hex()) } emptyAddr := common.Address{} diff --git a/test/configs/local-resharding.txt b/test/configs/local-resharding.txt index 4ce91ece1..fc3d3e97b 100644 --- a/test/configs/local-resharding.txt +++ b/test/configs/local-resharding.txt @@ -1,25 +1,12 @@ 127.0.0.1 9000 validator .hmy/65f55eb3052f9e9f632b2923be594ba77c55543f5c58ee1454b9cfd658d25e06373b0f7d42a19c84768139ea294f6204.key -127.0.0.1 9002 validator .hmy/40379eed79ed82bebfb4310894fd33b6a3f8413a78dc4d43b98d0adc9ef69f3285df05eaab9f2ce5f7227f8cb920e809.key -127.0.0.1 9004 validator .hmy/02c8ff0b88f313717bc3a627d2f8bb172ba3ad3bb9ba3ecb8eed4b7c878653d3d4faf769876c528b73f343967f74a917.key -127.0.0.1 9006 validator .hmy/ee2474f93cba9241562efc7475ac2721ab0899edf8f7f115a656c0c1f9ef8203add678064878d174bb478fa2e6630502.key -127.0.0.1 9008 validator .hmy/e751ec995defe4931273aaebcb2cd14bf37e629c554a57d3f334c37881a34a6188a93e76113c55ef3481da23b7d7ab09.key -127.0.0.1 9010 validator .hmy/776f3b8704f4e1092a302a60e84f81e476c212d6f458092b696df420ea19ff84a6179e8e23d090b9297dc041600bc100.key -127.0.0.1 9012 validator .hmy/2d61379e44a772e5757e27ee2b3874254f56073e6bd226eb8b160371cc3c18b8c4977bd3dcb71fd57dc62bf0e143fd08.key -127.0.0.1 9014 validator .hmy/c4e4708b6cf2a2ceeb59981677e9821eebafc5cf483fb5364a28fa604cc0ce69beeed40f3f03815c9e196fdaec5f1097.key -127.0.0.1 9016 validator .hmy/86dc2fdc2ceec18f6923b99fd86a68405c132e1005cf1df72dca75db0adfaeb53d201d66af37916d61f079f34f21fb96.key -127.0.0.1 9018 validator .hmy/49d15743b36334399f9985feb0753430a2b287b2d68b84495bbb15381854cbf01bca9d1d9f4c9c8f18509b2bfa6bd40f.key -127.0.0.1 9020 validator .hmy/95117937cd8c09acd2dfae847d74041a67834ea88662a7cbed1e170350bc329e53db151e5a0ef3e712e35287ae954818.key -127.0.0.1 9022 validator .hmy/68ae289d73332872ec8d04ac256ca0f5453c88ad392730c5741b6055bc3ec3d086ab03637713a29f459177aaa8340615.key -127.0.0.1 9200 explorer null 0 +127.0.0.1 9002 validator .hmy/02c8ff0b88f313717bc3a627d2f8bb172ba3ad3bb9ba3ecb8eed4b7c878653d3d4faf769876c528b73f343967f74a917.key +127.0.0.1 9004 validator .hmy/e751ec995defe4931273aaebcb2cd14bf37e629c554a57d3f334c37881a34a6188a93e76113c55ef3481da23b7d7ab09.key +127.0.0.1 9006 validator .hmy/2d61379e44a772e5757e27ee2b3874254f56073e6bd226eb8b160371cc3c18b8c4977bd3dcb71fd57dc62bf0e143fd08.key +127.0.0.1 9008 validator .hmy/86dc2fdc2ceec18f6923b99fd86a68405c132e1005cf1df72dca75db0adfaeb53d201d66af37916d61f079f34f21fb96.key +127.0.0.1 9010 validator .hmy/95117937cd8c09acd2dfae847d74041a67834ea88662a7cbed1e170350bc329e53db151e5a0ef3e712e35287ae954818.key +127.0.0.1 9099 explorer null 0 127.0.0.1 9100 validator .hmy/52ecce5f64db21cbe374c9268188f5d2cdd5bec1a3112276a350349860e35fb81f8cfe447a311e0550d961cf25cb988d.key -127.0.0.1 9102 validator .hmy/a547a9bf6fdde4f4934cde21473748861a3cc0fe8bbb5e57225a29f483b05b72531f002f8187675743d819c955a86100.key -127.0.0.1 9104 validator .hmy/678ec9670899bf6af85b877058bea4fc1301a5a3a376987e826e3ca150b80e3eaadffedad0fedfa111576fa76ded980c.key -127.0.0.1 9106 validator .hmy/63f479f249c59f0486fda8caa2ffb247209489dae009dfde6144ff38c370230963d360dffd318cfb26c213320e89a512.key -127.0.0.1 9108 validator .hmy/16513c487a6bb76f37219f3c2927a4f281f9dd3fd6ed2e3a64e500de6545cf391dd973cc228d24f9bd01efe94912e714.key -127.0.0.1 9110 validator .hmy/576d3c48294e00d6be4a22b07b66a870ddee03052fe48a5abbd180222e5d5a1f8946a78d55b025de21635fd743bbad90.key -127.0.0.1 9112 validator .hmy/eca09c1808b729ca56f1b5a6a287c6e1c3ae09e29ccf7efa35453471fcab07d9f73cee249e2b91f5ee44eb9618be3904.key -127.0.0.1 9114 validator .hmy/f47238daef97d60deedbde5302d05dea5de67608f11f406576e363661f7dcbc4a1385948549b31a6c70f6fde8a391486.key -127.0.0.1 9116 validator .hmy/fc4b9c535ee91f015efff3f32fbb9d32cdd9bfc8a837bb3eee89b8fff653c7af2050a4e147ebe5c7233dc2d5df06ee0a.key -127.0.0.1 9118 validator .hmy/ca86e551ee42adaaa6477322d7db869d3e203c00d7b86c82ebee629ad79cb6d57b8f3db28336778ec2180e56a8e07296.key -127.0.0.1 9300 explorer null 1 \ No newline at end of file +127.0.0.1 9102 validator .hmy/678ec9670899bf6af85b877058bea4fc1301a5a3a376987e826e3ca150b80e3eaadffedad0fedfa111576fa76ded980c.key +127.0.0.1 9104 validator .hmy/16513c487a6bb76f37219f3c2927a4f281f9dd3fd6ed2e3a64e500de6545cf391dd973cc228d24f9bd01efe94912e714.key +127.0.0.1 9106 validator .hmy/eca09c1808b729ca56f1b5a6a287c6e1c3ae09e29ccf7efa35453471fcab07d9f73cee249e2b91f5ee44eb9618be3904.key \ No newline at end of file diff --git a/test/deploy.sh b/test/deploy.sh index 6bbb12eb9..5a949637e 100755 --- a/test/deploy.sh +++ b/test/deploy.sh @@ -69,7 +69,7 @@ function launch_localnet() { if ${VERBOSE}; then verbosity=5 else - verbosity=3 + verbosity=5 fi base_args=(--log_folder "${log_folder}" --min_peers "${MIN}" --bootnodes "${BN_MA}" "--network_type=$NETWORK" --blspass file:"${ROOT}/.hmy/blspass.txt" "--dns=false" "--verbosity=${verbosity}" "--p2p.security.max-conn-per-ip=100") @@ -80,8 +80,11 @@ function launch_localnet() { while IFS='' read -r line || [[ -n "$line" ]]; do i=$((i + 1)) + + # Read config for i-th node form config file IFS=' ' read -r ip port mode bls_key shard node_config <<<"${line}" + echo "LINE: ${line} ${shard}" args=("${base_args[@]}" --ip "${ip}" --port "${port}" --key "/tmp/${ip}-${port}.key" --db_dir "${ROOT}/db-${ip}-${port}" "--broadcast_invalid_tx=false") if [[ -z "$ip" || -z "$port" ]]; then echo "skip empty node"