diff --git a/api/service/syncing/downloader/client.go b/api/service/syncing/downloader/client.go index 20b3e2302..cdaf4f96e 100644 --- a/api/service/syncing/downloader/client.go +++ b/api/service/syncing/downloader/client.go @@ -41,10 +41,12 @@ func (client *Client) Close() { } // GetBlockHashes gets block hashes from all the peers by calling grpc request. -func (client *Client) GetBlockHashes(startHash []byte, size uint32) *pb.DownloaderResponse { +func (client *Client) GetBlockHashes(startHash []byte, size uint32, ip, port string) *pb.DownloaderResponse { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_HEADER, BlockHash: startHash, Size: size} + request.Ip = ip + request.Port = port response, err := client.dlClient.Query(ctx, request) if err != nil { utils.Logger().Error().Err(err).Msg("[SYNC] GetBlockHashes query failed") @@ -109,13 +111,13 @@ func (client *Client) PushNewBlock(selfPeerHash [20]byte, blockHash []byte, time } // GetBlockChainHeight gets the blockheight from peer -func (client *Client) GetBlockChainHeight() *pb.DownloaderResponse { +func (client *Client) GetBlockChainHeight() (*pb.DownloaderResponse, error) { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_BLOCKHEIGHT} response, err := client.dlClient.Query(ctx, request) if err != nil { - utils.Logger().Error().Err(err).Msg("[SYNC] unable to get blockchain height") + return nil, err } - return response + return response, nil } diff --git a/api/service/syncing/downloader/interface.go b/api/service/syncing/downloader/interface.go index b74788451..d02d2d836 100644 --- a/api/service/syncing/downloader/interface.go +++ b/api/service/syncing/downloader/interface.go @@ -7,5 +7,6 @@ import ( // DownloadInterface is the interface for downloader package. type DownloadInterface interface { // State Syncing server-side interface, responsible for all kinds of state syncing grpc calls - CalculateResponse(request *pb.DownloaderRequest) (*pb.DownloaderResponse, error) + // incomingPeer is incoming peer ip:port information + CalculateResponse(request *pb.DownloaderRequest, incomingPeer string) (*pb.DownloaderResponse, error) } diff --git a/api/service/syncing/downloader/server.go b/api/service/syncing/downloader/server.go index fb3548681..d1953aa8f 100644 --- a/api/service/syncing/downloader/server.go +++ b/api/service/syncing/downloader/server.go @@ -9,6 +9,7 @@ import ( "github.com/harmony-one/harmony/internal/utils" "google.golang.org/grpc" + "google.golang.org/grpc/peer" ) // Constants for downloader server. @@ -24,7 +25,15 @@ type Server struct { // Query returns the feature at the given point. func (s *Server) Query(ctx context.Context, request *pb.DownloaderRequest) (*pb.DownloaderResponse, error) { - response, err := s.downloadInterface.CalculateResponse(request) + var pinfo string + // retrieve ip/port information; used for debug only + p, ok := peer.FromContext(ctx) + if !ok { + pinfo = "" + } else { + pinfo = p.Addr.String() + } + response, err := s.downloadInterface.CalculateResponse(request, pinfo) if err != nil { return nil, err } diff --git a/api/service/syncing/syncing.go b/api/service/syncing/syncing.go index fd45bf9e8..2c15bba59 100644 --- a/api/service/syncing/syncing.go +++ b/api/service/syncing/syncing.go @@ -334,8 +334,12 @@ func (ss *StateSync) GetConsensusHashes(startHash []byte, size uint32) bool { wg.Add(1) go func() { defer wg.Done() - response := peerConfig.client.GetBlockHashes(startHash, size) + response := peerConfig.client.GetBlockHashes(startHash, size, ss.selfip, ss.selfport) if response == nil { + utils.Logger().Warn(). + Str("peer IP", peerConfig.ip). + Str("peer Port", peerConfig.port). + Msg("[SYNC] GetConsensusHashes Nil Response") return } if len(response.Payload) > int(size+1) { @@ -677,8 +681,12 @@ func (ss *StateSync) getMaxPeerHeight() uint64 { go func() { defer wg.Done() //debug - // utils.Logger().Warn().Str("IP", peerConfig.ip).Str("Port", peerConfig.port).Msg("[Sync] getMaxPeerHeight") - response := peerConfig.client.GetBlockChainHeight() + // utils.Logger().Warn().Str("IP", peerConfig.ip).Str("Port", peerConfig.port).Msg("[Sync]getMaxPeerHeight") + response, err := peerConfig.client.GetBlockChainHeight() + if err != nil { + utils.Logger().Warn().Err(err).Str("IP", peerConfig.ip).Str("Port", peerConfig.port).Msg("[Sync]GetBlockChainHeight failed") + return + } ss.syncMux.Lock() if response != nil && maxHeight < response.BlockHeight { maxHeight = response.BlockHeight diff --git a/cmd/client/txgen/main.go b/cmd/client/txgen/main.go index 73e4807f4..14ef4084d 100644 --- a/cmd/client/txgen/main.go +++ b/cmd/client/txgen/main.go @@ -119,7 +119,6 @@ func setUpTXGen() *node.Node { } txGen.NodeConfig.SetRole(nodeconfig.ClientNode) if shardID == 0 { - txGen.NodeConfig.SetIsBeacon(true) txGen.NodeConfig.SetShardGroupID(p2p.GroupIDBeacon) } else { txGen.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(shardID))) diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 667ad3712..1670579f4 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -88,9 +88,7 @@ var ( // networkType indicates the type of the network networkType = flag.String("network_type", "mainnet", "type of the network: mainnet, testnet, devnet, localnet") // blockPeriod indicates the how long the leader waits to propose a new block. - blockPeriod = flag.Int("block_period", 8, "how long in second the leader waits to propose a new block.") - // isNewNode indicates this node is a new node - isNewNode = flag.Bool("is_newnode", false, "true means this node is a new node") + blockPeriod = flag.Int("block_period", 8, "how long in second the leader waits to propose a new block.") leaderOverride = flag.Bool("leader_override", false, "true means override the default leader role and acts as validator") // shardID indicates the shard ID of this node shardID = flag.Int("shard_id", -1, "the shard ID of this node") @@ -110,7 +108,7 @@ var ( keystoreDir = flag.String("keystore", hmykey.DefaultKeyStoreDir, "The default keystore directory") - genesisAccount = &genesis.DeployAccount{} + initialAccount = &genesis.DeployAccount{} // logging verbosity verbosity = flag.Int("verbosity", 5, "Logging verbosity: 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=detail (default: 5)") @@ -160,11 +158,6 @@ func initSetup() { } utils.BootNodes = bootNodeAddrs } - - // Set up manual call for garbage collection. - if *enableGC { - memprofiling.MaybeCallGCPeriodically() - } } func passphraseForBls() { @@ -181,34 +174,34 @@ func passphraseForBls() { blsPassphrase = passphrase } -func setupGenesisAccount() (isLeader bool) { +func setupInitialAccount() (isLeader bool) { genesisShardingConfig := core.ShardingSchedule.InstanceForEpoch(big.NewInt(core.GenesisEpoch)) - pubKey := setUpConsensusKey(nodeconfig.GetDefaultConfig()) + pubKey := setupConsensusKey(nodeconfig.GetDefaultConfig()) reshardingEpoch := genesisShardingConfig.ReshardingEpoch() if reshardingEpoch != nil && len(reshardingEpoch) > 0 { for _, epoch := range reshardingEpoch { config := core.ShardingSchedule.InstanceForEpoch(epoch) - isLeader, genesisAccount = config.FindAccount(pubKey.SerializeToHexStr()) - if genesisAccount != nil { + isLeader, initialAccount = config.FindAccount(pubKey.SerializeToHexStr()) + if initialAccount != nil { break } } } else { - isLeader, genesisAccount = genesisShardingConfig.FindAccount(pubKey.SerializeToHexStr()) + isLeader, initialAccount = genesisShardingConfig.FindAccount(pubKey.SerializeToHexStr()) } - if genesisAccount == nil { + if initialAccount == nil { fmt.Printf("cannot find your BLS key in the genesis/FN tables: %s\n", pubKey.SerializeToHexStr()) os.Exit(100) } - fmt.Printf("My Genesis Account: %v\n", *genesisAccount) + fmt.Printf("My Genesis Account: %v\n", *initialAccount) return isLeader } -func setUpConsensusKey(nodeConfig *nodeconfig.ConfigType) *bls.PublicKey { +func setupConsensusKey(nodeConfig *nodeconfig.ConfigType) *bls.PublicKey { consensusPriKey, err := blsgen.LoadBlsKeyWithPassPhrase(*blsKeyFile, blsPassphrase) if err != nil { fmt.Printf("error when loading bls key, err :%v\n", err) @@ -225,13 +218,13 @@ func setUpConsensusKey(nodeConfig *nodeconfig.ConfigType) *bls.PublicKey { return pubKey } -func createGlobalConfig(isLeader bool) *nodeconfig.ConfigType { +func createGlobalConfig() *nodeconfig.ConfigType { var err error - nodeConfig := nodeconfig.GetShardConfig(genesisAccount.ShardID) + nodeConfig := nodeconfig.GetShardConfig(initialAccount.ShardID) if !*isExplorer { // Set up consensus keys. - setUpConsensusKey(nodeConfig) + setupConsensusKey(nodeConfig) } else { nodeConfig.ConsensusPriKey = &bls.SecretKey{} // set dummy bls key for consensus object } @@ -257,13 +250,6 @@ func createGlobalConfig(isLeader bool) *nodeconfig.ConfigType { } nodeConfig.SelfPeer = p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: nodeConfig.ConsensusPubKey} - if isLeader && !*isExplorer && !*leaderOverride { // The first node in a shard is the leader at genesis - nodeConfig.Leader = nodeConfig.SelfPeer - nodeConfig.StringRole = "leader" - } else { - nodeConfig.StringRole = "validator" - } - nodeConfig.Host, err = p2pimpl.NewHost(&nodeConfig.SelfPeer, nodeConfig.P2pPriKey) if *logConn && nodeConfig.GetNetworkType() != nodeconfig.Mainnet { nodeConfig.Host.GetP2PHost().Network().Notify(utils.NewConnLogger(utils.GetLogInstance())) @@ -272,22 +258,17 @@ func createGlobalConfig(isLeader bool) *nodeconfig.ConfigType { panic("unable to new host in harmony") } - if err := nodeConfig.Host.AddPeer(&nodeConfig.Leader); err != nil { - ctxerror.Warn(utils.GetLogger(), err, "(*p2p.Host).AddPeer failed", - "peer", &nodeConfig.Leader) - } - nodeConfig.DBDir = *dbDir return nodeConfig } -func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { +func setupConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { // Consensus object. // TODO: consensus object shouldn't start here // TODO(minhdoan): During refactoring, found out that the peers list is actually empty. Need to clean up the logic of consensus later. currentConsensus, err := consensus.New(nodeConfig.Host, nodeConfig.ShardID, nodeConfig.Leader, nodeConfig.ConsensusPriKey) - currentConsensus.SelfAddress = common.ParseAddr(genesisAccount.Address) + currentConsensus.SelfAddress = common.ParseAddr(initialAccount.Address) if err != nil { fmt.Fprintf(os.Stderr, "Error :%v \n", err) @@ -301,10 +282,6 @@ func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { currentConsensus.SetCommitDelay(commitDelay) currentConsensus.MinPeers = *minPeers - if *isNewNode { - currentConsensus.SetMode(consensus.Listening) - } - if *disableViewChange { currentConsensus.DisableViewChangeForTestingOnly() } @@ -317,7 +294,6 @@ func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { } else if *dnsFlag { currentNode.SetDNSZone("t.hmny.io") } - currentNode.NodeConfig.SetRole(nodeconfig.NewNode) // TODO: add staking support // currentNode.StakingAccount = myAccount utils.GetLogInstance().Info("node account set", @@ -326,45 +302,21 @@ func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { // TODO: refactor the creation of blockchain out of node.New() currentConsensus.ChainReader = currentNode.Blockchain() - // TODO: the setup should only based on shard state - if *isGenesis { - // TODO: need change config file and use switch instead of complicated "if else" condition - if nodeConfig.ShardID == 0 { // Beacon chain - nodeConfig.SetIsBeacon(true) - if nodeConfig.StringRole == "leader" { - currentNode.NodeConfig.SetRole(nodeconfig.BeaconLeader) - } else { - currentNode.NodeConfig.SetRole(nodeconfig.BeaconValidator) - } + if *isExplorer { + currentNode.NodeConfig.SetRole(nodeconfig.ExplorerNode) + currentNode.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(*shardID))) + currentNode.NodeConfig.SetClientGroupID(p2p.NewClientGroupIDByShardID(p2p.ShardID(*shardID))) + } else { + if nodeConfig.ShardID == 0 { + currentNode.NodeConfig.SetRole(nodeconfig.Validator) currentNode.NodeConfig.SetShardGroupID(p2p.GroupIDBeacon) currentNode.NodeConfig.SetClientGroupID(p2p.GroupIDBeaconClient) } else { - if nodeConfig.StringRole == "leader" { - currentNode.NodeConfig.SetRole(nodeconfig.ShardLeader) - } else { - currentNode.NodeConfig.SetRole(nodeconfig.ShardValidator) - } + currentNode.NodeConfig.SetRole(nodeconfig.Validator) currentNode.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(nodeConfig.ShardID))) currentNode.NodeConfig.SetClientGroupID(p2p.NewClientGroupIDByShardID(p2p.ShardID(nodeConfig.ShardID))) } - } else { - if *isNewNode { - if nodeConfig.ShardID == 0 { // Beacon chain - nodeConfig.SetIsBeacon(true) - currentNode.NodeConfig.SetRole(nodeconfig.BeaconValidator) - currentNode.NodeConfig.SetShardGroupID(p2p.GroupIDBeacon) - currentNode.NodeConfig.SetClientGroupID(p2p.GroupIDBeaconClient) - } else { - currentNode.NodeConfig.SetRole(nodeconfig.ShardValidator) - currentNode.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(nodeConfig.ShardID))) - currentNode.NodeConfig.SetClientGroupID(p2p.NewClientGroupIDByShardID(p2p.ShardID(nodeConfig.ShardID))) - } - } - if *isExplorer { - currentNode.NodeConfig.SetRole(nodeconfig.ExplorerNode) - currentNode.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(*shardID))) - currentNode.NodeConfig.SetClientGroupID(p2p.NewClientGroupIDByShardID(p2p.ShardID(*shardID))) - } + } currentNode.NodeConfig.ConsensusPubKey = nodeConfig.ConsensusPubKey currentNode.NodeConfig.ConsensusPriKey = nodeConfig.ConsensusPriKey @@ -380,12 +332,9 @@ func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) *node.Node { // currentNode.DRand = dRand // This needs to be executed after consensus and drand are setup - if !*isNewNode || *shardID > -1 { // initial staking new node doesn't need to initialize shard state - // TODO: Have a better way to distinguish non-genesis node - if err := currentNode.InitShardState(*shardID == -1 && !*isNewNode); err != nil { - ctxerror.Crit(utils.GetLogger(), err, "InitShardState failed", - "shardID", *shardID, "isNewNode", *isNewNode) - } + if err := currentNode.InitShardState(); err != nil { + ctxerror.Crit(utils.GetLogger(), err, "InitShardState failed", + "shardID", *shardID) } // Set the consensus ID to be the current block number @@ -443,17 +392,17 @@ func main() { memprofiling.MaybeCallGCPeriodically() } - isLeader := false - if !*isExplorer { // Explorer node doesn't need the following setup - isLeader = setupGenesisAccount() + if !*isExplorer { + setupInitialAccount() } + if *shardID >= 0 { - utils.GetLogInstance().Info("ShardID Override", "original", genesisAccount.ShardID, "override", *shardID) - genesisAccount.ShardID = uint32(*shardID) + utils.GetLogInstance().Info("ShardID Override", "original", initialAccount.ShardID, "override", *shardID) + initialAccount.ShardID = uint32(*shardID) } - nodeConfig := createGlobalConfig(isLeader) - currentNode := setUpConsensusAndNode(nodeConfig) + nodeConfig := createGlobalConfig() + currentNode := setupConsensusAndNode(nodeConfig) //if consensus.ShardID != 0 { // go currentNode.SupportBeaconSyncing() diff --git a/consensus/consensus.go b/consensus/consensus.go index e3baca773..596a57d41 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -146,6 +146,8 @@ type Consensus struct { // The p2p host used to send/receive p2p messages host p2p.Host + // MessageSender takes are of sending consensus message and the corresponding retry logic. + msgSender *MessageSender // Staking information finder stakeInfoFinder StakeInfoFinder @@ -242,6 +244,7 @@ type StakeInfoFinder interface { func New(host p2p.Host, ShardID uint32, leader p2p.Peer, blsPriKey *bls.SecretKey) (*Consensus, error) { consensus := Consensus{} consensus.host = host + consensus.msgSender = NewMessageSender(host) consensus.blockNumLowChan = make(chan struct{}) // pbft related diff --git a/consensus/consensus_msg_sender.go b/consensus/consensus_msg_sender.go new file mode 100644 index 000000000..97427a459 --- /dev/null +++ b/consensus/consensus_msg_sender.go @@ -0,0 +1,138 @@ +package consensus + +import ( + "sync" + "time" + + msg_pb "github.com/harmony-one/harmony/api/proto/message" + "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/p2p" +) + +const ( + // RetryIntervalInSec is the interval for message retry + RetryIntervalInSec = 10 +) + +// MessageSender is the wrapper object that controls how a consensus message is sent +type MessageSender struct { + blockNum uint64 // The current block number at consensus + blockNumMutex sync.Mutex + messagesToRetry sync.Map + // The p2p host used to send/receive p2p messages + host p2p.Host + // RetryTimes is number of retry attempts + retryTimes int +} + +// MessageRetry controls the message that can be retried +type MessageRetry struct { + blockNum uint64 // The block number this message is for + groups []p2p.GroupID + p2pMsg []byte + msgType msg_pb.MessageType + retryCount int + isActive bool + isActiveMutex sync.Mutex +} + +// NewMessageSender initializes the consensus message sender. +func NewMessageSender(host p2p.Host) *MessageSender { + return &MessageSender{host: host, retryTimes: int(phaseDuration.Seconds()) / RetryIntervalInSec} +} + +// Reset resets the sender's state for new block +func (sender *MessageSender) Reset(blockNum uint64) { + sender.blockNumMutex.Lock() + sender.blockNum = blockNum + sender.blockNumMutex.Unlock() + sender.StopAllRetriesExceptCommitted() + sender.messagesToRetry.Range(func(key interface{}, value interface{}) bool { + if msgRetry, ok := value.(*MessageRetry); ok { + if msgRetry.msgType != msg_pb.MessageType_COMMITTED { + sender.messagesToRetry.Delete(key) + } + } + return true + }) +} + +// SendWithRetry sends message with retry logic. +func (sender *MessageSender) SendWithRetry(blockNum uint64, msgType msg_pb.MessageType, groups []p2p.GroupID, p2pMsg []byte) error { + willRetry := sender.retryTimes != 0 + msgRetry := MessageRetry{blockNum: blockNum, groups: groups, p2pMsg: p2pMsg, msgType: msgType, retryCount: 0, isActive: willRetry} + if willRetry { + sender.messagesToRetry.Store(msgType, &msgRetry) + go func() { + sender.Retry(&msgRetry) + }() + } + return sender.host.SendMessageToGroups(groups, p2pMsg) +} + +// SendWithoutRetry sends message without retry logic. +func (sender *MessageSender) SendWithoutRetry(groups []p2p.GroupID, p2pMsg []byte) error { + return sender.host.SendMessageToGroups(groups, p2pMsg) +} + +// Retry will retry the consensus message for times. +func (sender *MessageSender) Retry(msgRetry *MessageRetry) { + for { + time.Sleep(RetryIntervalInSec * time.Second) + + if msgRetry.retryCount >= sender.retryTimes { + // Retried enough times + return + } + + msgRetry.isActiveMutex.Lock() + if !msgRetry.isActive { + msgRetry.isActiveMutex.Unlock() + // Retry is stopped + return + } + msgRetry.isActiveMutex.Unlock() + + if msgRetry.msgType != msg_pb.MessageType_COMMITTED { + sender.blockNumMutex.Lock() + if msgRetry.blockNum < sender.blockNum { + sender.blockNumMutex.Unlock() + // Block already moved ahead, no need to retry old block's messages + return + } + sender.blockNumMutex.Unlock() + } + + msgRetry.retryCount++ + if err := sender.host.SendMessageToGroups(msgRetry.groups, msgRetry.p2pMsg); err != nil { + utils.GetLogInstance().Warn("[Retry] Failed re-sending consensus message", "groupID", msgRetry.groups, "blockNum", msgRetry.blockNum, "MsgType", msgRetry.msgType, "RetryCount", msgRetry.retryCount) + } else { + utils.GetLogInstance().Info("[Retry] Successfully resent consensus message", "groupID", msgRetry.groups, "blockNum", msgRetry.blockNum, "MsgType", msgRetry.msgType, "RetryCount", msgRetry.retryCount) + } + } +} + +// StopRetry stops the retry. +func (sender *MessageSender) StopRetry(msgType msg_pb.MessageType) { + data, ok := sender.messagesToRetry.Load(msgType) + if ok { + msgRetry := data.(*MessageRetry) + msgRetry.isActiveMutex.Lock() + msgRetry.isActive = false + msgRetry.isActiveMutex.Unlock() + } +} + +// StopAllRetriesExceptCommitted stops all the existing retries except committed message (which lives across consensus). +func (sender *MessageSender) StopAllRetriesExceptCommitted() { + sender.messagesToRetry.Range(func(k, v interface{}) bool { + if msgRetry, ok := v.(*MessageRetry); ok { + if msgRetry.msgType != msg_pb.MessageType_COMMITTED { + msgRetry.isActiveMutex.Lock() + msgRetry.isActive = false + msgRetry.isActiveMutex.Unlock() + } + } + return true + }) +} diff --git a/consensus/consensus_service.go b/consensus/consensus_service.go index 4d957a059..5a97f3efa 100644 --- a/consensus/consensus_service.go +++ b/consensus/consensus_service.go @@ -665,6 +665,7 @@ func (consensus *Consensus) updateConsensusInformation() { Str("leaderPubKey", leaderPubKey.SerializeToHexStr()). Msg("[SYNC] Most Recent LeaderPubKey Updated Based on BlockChain") consensus.LeaderPubKey = leaderPubKey + consensus.mode.SetMode(Normal) } } diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 8e78b816b..e455f5d47 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -129,7 +129,8 @@ func (consensus *Consensus) announce(block *types.Block) { } // Construct broadcast p2p message - if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { + + if err := consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_ANNOUNCE, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { consensus.getLogger().Warn(). Str("groupID", string(p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID)))). Msg("[Announce] Cannot send announce message") @@ -276,7 +277,8 @@ func (consensus *Consensus) prepare() { // Construct and send prepare message msgToSend := consensus.constructPrepareMessage() // TODO: this will not return immediatey, may block - if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { + + if err := consensus.msgSender.SendWithoutRetry([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { consensus.getLogger().Warn().Err(err).Msg("[OnAnnounce] Cannot send prepare message") } else { consensus.getLogger().Info(). @@ -397,7 +399,7 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) { return } - if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { + if err := consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_PREPARED, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { consensus.getLogger().Warn().Msg("[OnPrepare] Cannot send prepared message") } else { consensus.getLogger().Debug(). @@ -405,6 +407,9 @@ func (consensus *Consensus) onPrepare(msg *msg_pb.Message) { Uint64("BlockNum", consensus.blockNum). Msg("[OnPrepare] Sent Prepared Message!!") } + consensus.msgSender.StopRetry(msg_pb.MessageType_ANNOUNCE) + consensus.msgSender.StopRetry(msg_pb.MessageType_COMMITTED) // Stop retry committed msg of last consensus + consensus.getLogger().Debug(). Str("From", consensus.phase.String()). Str("To", Commit.String()). @@ -576,7 +581,7 @@ func (consensus *Consensus) onPrepared(msg *msg_pb.Message) { time.Sleep(consensus.delayCommit) } - if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { + if err := consensus.msgSender.SendWithoutRetry([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { consensus.getLogger().Warn().Msg("[OnPrepared] Cannot send commit message!!") } else { consensus.getLogger().Info(). @@ -700,6 +705,8 @@ func (consensus *Consensus) onCommit(msg *msg_pb.Message) { logger.Debug().Msg("[OnCommit] Commit Grace Period Ended") consensus.commitFinishChan <- viewID }(consensus.viewID) + + consensus.msgSender.StopRetry(msg_pb.MessageType_PREPARED) } if rewardThresholdIsMet { @@ -748,7 +755,8 @@ func (consensus *Consensus) finalizeCommits() { return } // if leader success finalize the block, send committed message to validators - if err := consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { + + if err := consensus.msgSender.SendWithRetry(block.NumberU64(), msg_pb.MessageType_COMMITTED, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)); err != nil { consensus.getLogger().Warn().Err(err).Msg("[Finalizing] Cannot send committed message") } else { consensus.getLogger().Info(). @@ -803,7 +811,7 @@ func (consensus *Consensus) onCommitted(msg *msg_pb.Message) { consensus.getLogger().Info().Msg("[OnCommitted] PublicKeys is Empty, Cannot update public keys") return } - consensus.getLogger().Info().Int("numKeys", len(pubKeys)).Msg("[OnCommitted] Update Shard Info and PublicKeys") + consensus.getLogger().Info().Int("numKeys", len(pubKeys)).Msg("[OnCommitted] Try to Update Shard Info and PublicKeys") for _, key := range pubKeys { if key.IsEqual(consensus.PubKey) { @@ -1171,6 +1179,8 @@ func (consensus *Consensus) Start(blockChannel chan *types.Block, stopChan chan } startTime = time.Now() + consensus.msgSender.Reset(newBlock.NumberU64()) + consensus.getLogger().Debug(). Int("numTxs", len(newBlock.Transactions())). Interface("consensus", consensus). diff --git a/consensus/view_change.go b/consensus/view_change.go index fda4d31d4..99aa58b1c 100644 --- a/consensus/view_change.go +++ b/consensus/view_change.go @@ -427,11 +427,12 @@ func (consensus *Consensus) onViewChange(msg *msg_pb.Message) { consensus.mode.SetViewID(recvMsg.ViewID) msgToSend := consensus.constructNewViewMessage() + consensus.getLogger().Warn(). Int("payloadSize", len(consensus.m1Payload)). Bytes("M1Payload", consensus.m1Payload). Msg("[onViewChange] Sent NewView Message") - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) + consensus.msgSender.SendWithRetry(consensus.blockNum, msg_pb.MessageType_NEWVIEW, []p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.viewID = recvMsg.ViewID consensus.ResetViewChangeState() diff --git a/core/core_test.go b/core/core_test.go index bbee042d4..d123da20c 100644 --- a/core/core_test.go +++ b/core/core_test.go @@ -11,7 +11,7 @@ import ( func TestIsEpochBlock(t *testing.T) { block1 := types.NewBlock(&types.Header{Number: big.NewInt(10)}, nil, nil) block2 := types.NewBlock(&types.Header{Number: big.NewInt(0)}, nil, nil) - block3 := types.NewBlock(&types.Header{Number: big.NewInt(327680)}, nil, nil) + block3 := types.NewBlock(&types.Header{Number: big.NewInt(344064)}, nil, nil) block4 := types.NewBlock(&types.Header{Number: big.NewInt(77)}, nil, nil) block5 := types.NewBlock(&types.Header{Number: big.NewInt(78)}, nil, nil) block6 := types.NewBlock(&types.Header{Number: big.NewInt(188)}, nil, nil) @@ -57,11 +57,11 @@ func TestIsEpochBlock(t *testing.T) { true, }, } - for _, test := range tests { + for i, test := range tests { ShardingSchedule = test.schedule r := IsEpochBlock(test.block) if r != test.expected { - t.Errorf("expected: %v, got: %v\n", test.expected, r) + t.Errorf("index: %v, expected: %v, got: %v\n", i, test.expected, r) } } } diff --git a/internal/configs/node/config.go b/internal/configs/node/config.go index d544ffa9d..c1a0d79f4 100644 --- a/internal/configs/node/config.go +++ b/internal/configs/node/config.go @@ -21,11 +21,7 @@ type Role byte // All constants for different node roles. const ( Unknown Role = iota - ShardLeader - ShardValidator - BeaconLeader - BeaconValidator - NewNode + Validator ClientNode WalletNode ExplorerNode @@ -35,16 +31,8 @@ func (role Role) String() string { switch role { case Unknown: return "Unknown" - case ShardLeader: - return "ShardLeader" - case ShardValidator: - return "ShardValidator" - case BeaconLeader: - return "BeaconLeader" - case BeaconValidator: - return "BeaconValidator" - case NewNode: - return "NewNode" + case Validator: + return "Validator" case ClientNode: return "ClientNode" case WalletNode: @@ -145,7 +133,7 @@ func GetDefaultConfig() *ConfigType { } func (conf *ConfigType) String() string { - return fmt.Sprintf("%s/%s/%s:%v,%v,%v", conf.beacon, conf.group, conf.client, conf.isClient, conf.IsBeacon(), conf.ShardID) + return fmt.Sprintf("%s/%s/%s:%v,%v", conf.beacon, conf.group, conf.client, conf.isClient, conf.ShardID) } // SetBeaconGroupID set the groupID for beacon group @@ -168,11 +156,6 @@ func (conf *ConfigType) SetIsClient(b bool) { conf.isClient = b } -// SetIsBeacon sets the isBeacon configuration -func (conf *ConfigType) SetIsBeacon(b bool) { - conf.isBeacon = b -} - // SetShardID set the ShardID func (conf *ConfigType) SetShardID(s uint32) { conf.ShardID = s @@ -203,11 +186,6 @@ func (conf *ConfigType) IsClient() bool { return conf.isClient } -// IsBeacon returns the isBeacon configuration -func (conf *ConfigType) IsBeacon() bool { - return conf.isBeacon -} - // Role returns the role func (conf *ConfigType) Role() Role { return conf.role diff --git a/internal/configs/node/config_test.go b/internal/configs/node/config_test.go index 9b53c75e5..cf4da0a5b 100644 --- a/internal/configs/node/config_test.go +++ b/internal/configs/node/config_test.go @@ -26,7 +26,6 @@ func TestNodeConfigSingleton(t *testing.T) { func TestNodeConfigMultiple(t *testing.T) { // init 3 configs - c := GetShardConfig(2) d := GetShardConfig(1) e := GetShardConfig(0) f := GetShardConfig(42) @@ -35,16 +34,6 @@ func TestNodeConfigMultiple(t *testing.T) { t.Errorf("expecting nil, got: %v", f) } - if c.IsBeacon() != false { - t.Errorf("expecting the node to not be beacon yet, got: %v", c.IsBeacon()) - } - - c.SetIsBeacon(true) - - if c.IsBeacon() != true { - t.Errorf("expecting the node to be beacon, got: %v", c.IsBeacon()) - } - d.SetShardGroupID("abcd") if d.GetShardGroupID() != "abcd" { t.Errorf("expecting abcd, got: %v", d.GetShardGroupID()) @@ -59,12 +48,4 @@ func TestNodeConfigMultiple(t *testing.T) { if e.IsClient() != false { t.Errorf("expecting false, got: %v", e.IsClient()) } - - c.SetRole(NewNode) - if c.Role() != NewNode { - t.Errorf("expecting NewNode, got: %s", c.Role()) - } - if c.Role().String() != "NewNode" { - t.Errorf("expecting NewNode, got: %s", c.Role().String()) - } } diff --git a/internal/configs/sharding/localnet.go b/internal/configs/sharding/localnet.go index 07a6123e6..953095e8b 100644 --- a/internal/configs/sharding/localnet.go +++ b/internal/configs/sharding/localnet.go @@ -16,11 +16,11 @@ const ( localnetV1Epoch = 1 localnetV2Epoch = 2 - localnetEpochBlock1 = 36 - twoOne = 11 + localnetEpochBlock1 = 10 + twoOne = 5 localnetVdfDifficulty = 5000 // This takes about 10s to finish the vdf -) + func (localnetSchedule) InstanceForEpoch(epoch *big.Int) Instance { switch { diff --git a/internal/configs/sharding/mainnet.go b/internal/configs/sharding/mainnet.go index 17e389ab2..06303b13a 100644 --- a/internal/configs/sharding/mainnet.go +++ b/internal/configs/sharding/mainnet.go @@ -7,7 +7,7 @@ import ( ) const ( - mainnetEpochBlock1 = 327680 // 20 * 2^14 + mainnetEpochBlock1 = 344064 // 21 * 2^14 blocksPerShard = 16384 // 2^14 mainnetV1Epoch = 1 @@ -22,7 +22,7 @@ type mainnetSchedule struct{} func (mainnetSchedule) InstanceForEpoch(epoch *big.Int) Instance { switch { case epoch.Cmp(big.NewInt(mainnetV1Epoch)) >= 0: - // first resharding epoch around 07/29/2019 7:30am PDT + // first resharding epoch around 07/30/2019 10:30pm PDT return mainnetV1 default: // genesis return mainnetV0 @@ -61,7 +61,7 @@ func (ms mainnetSchedule) VdfDifficulty() int { var mainnetReshardingEpoch = []*big.Int{big.NewInt(0), big.NewInt(mainnetV1Epoch)} var mainnetV0 = MustNewInstance(4, 150, 112, genesis.HarmonyAccounts, genesis.FoundationalNodeAccounts, mainnetReshardingEpoch) -var mainnetV1 = MustNewInstance(4, 151, 112, genesis.HarmonyAccounts, genesis.FoundationalNodeAccountsV1, mainnetReshardingEpoch) +var mainnetV1 = MustNewInstance(4, 152, 112, genesis.HarmonyAccounts, genesis.FoundationalNodeAccountsV1, mainnetReshardingEpoch) //var mainnetV2 = MustNewInstance(8, 200, 100) //var mainnet6400 = MustNewInstance(16, 400, 50) diff --git a/internal/configs/sharding/shardingconfig_test.go b/internal/configs/sharding/shardingconfig_test.go index f1b05ed11..89abd8f45 100644 --- a/internal/configs/sharding/shardingconfig_test.go +++ b/internal/configs/sharding/shardingconfig_test.go @@ -51,22 +51,30 @@ func TestCalcEpochNumber(t *testing.T) { }, { 327680, - big.NewInt(1), + big.NewInt(0), }, { 344064, - big.NewInt(2), + big.NewInt(1), }, { 344063, + big.NewInt(0), + }, + { + 344065, big.NewInt(1), }, + { + 360448, + big.NewInt(2), + }, } - for _, test := range tests { + for i, test := range tests { ep := MainnetSchedule.CalcEpochNumber(test.block) if ep.Cmp(test.epoch) != 0 { - t.Errorf("CalcEpochNumber error: got %v, expect %v\n", ep, test.epoch) + t.Errorf("CalcEpochNumber error: index %v, got %v, expect %v\n", i, ep, test.epoch) } } } diff --git a/internal/genesis/foundational.go b/internal/genesis/foundational.go index a13ef1e3a..a3e02701a 100644 --- a/internal/genesis/foundational.go +++ b/internal/genesis/foundational.go @@ -314,4 +314,8 @@ var FoundationalNodeAccountsV1 = []DeployAccount{ {Index: "153", Address: "one1nv4auwyhu7nnkcgwk4dx8z3lqt9xqvp6vw57p8", BlsPublicKey: "d9565fbcbf88929df0dc8a8b143a0172a4a038f90edc1cf91711d152b5f7fb626a1c9a9ce40d40e54a443f08cc991818"}, {Index: "154", Address: "one1wnhm4jaq96gzk7xa0ch9alrez3lm3zuu3qaxfg", BlsPublicKey: "b506426b514ee39d3c4746cce5de4720411151cf65d50106b5bd90d50fe2099bd924967517dfa0c08871fa83ba581b00"}, {Index: "155", Address: "one10uyfuzaztcccz97w29v0k64rzmhj4k862kfh5q", BlsPublicKey: "e75e5a222bd9e9004385d593194606f48b3e6bf8a95c68830ea1cd8f56bbcdedcb680c9598c66230ea0c2b79a6c58296"}, + {Index: "156", Address: "one1s3dx73sa5dzrksmds5recptale8pxsa4d4hzt4", BlsPublicKey: "87d4f6c37073a108b94a6e7799f62b2051c44892328bdcb8e5dd4f4596b1ba2952947c744b5daf183e9f8361282c9101"}, + {Index: "157", Address: "one1vfglvsfuk52025r5apqlfaqky37462tsdjeemf", BlsPublicKey: "6d320742fbff3aa1877aadb9316a865edbdecb0fb74fc973272d73ec1deaff131b653c3ab7a2b26753c717347f450a00"}, + {Index: "158", Address: "one1pjn8zz5av5ddenaxmu6qrs381xuapygkeatxga", BlsPublicKey: "71c907378831009328f28db0e324848767b58e49eae1f2774e81276e25732bfea5ed8a567fed15afb010be05b9732b16"}, + {Index: "159", Address: "one1fzh923dkauvyye7w68nc38j2dw54gldu5mheaz", BlsPublicKey: "b5c94a5071f942c77f3599098430b8f2dbd6da70c5ef830192bdef5638908cd1fa188059d7aecc8b721116b946c4cc8e"}, } diff --git a/node/node.go b/node/node.go index 223645385..cba5c415c 100644 --- a/node/node.go +++ b/node/node.go @@ -2,15 +2,11 @@ package node import ( "crypto/ecdsa" - "encoding/hex" "fmt" - "math/big" "sync" "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/log" - "github.com/harmony-one/bls/ffi/go/bls" "github.com/harmony-one/harmony/accounts" "github.com/harmony-one/harmony/api/client" clientService "github.com/harmony-one/harmony/api/client/service" @@ -391,46 +387,41 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, chainDBFactory shardc return &node } -// InitShardState initialize genesis shard state and update committee pub keys for consensus and drand -func (node *Node) InitShardState(isGenesis bool) (err error) { - logger := utils.GetLogInstance().New("isGenesis", isGenesis) - getLogger := func() log.Logger { return utils.WithCallerSkip(logger, 1) } +// InitShardState initialize shard state from latest epoch and update committee pub keys for consensus and drand +func (node *Node) InitShardState() (err error) { if node.Consensus == nil { - getLogger().Crit("consensus is nil; cannot figure out shard ID") + return ctxerror.New("[InitShardState] consenus is nil; Cannot figure out shardID") } shardID := node.Consensus.ShardID - logger = logger.New("shardID", shardID) - getLogger().Info("initializing shard state") // Get genesis epoch shard state from chain - genesisEpoch := big.NewInt(core.GenesisEpoch) - shardState, err := node.Beaconchain().GetShardState(genesisEpoch, nil) - if err != nil { - return ctxerror.New("cannot read genesis shard state").WithCause(err) + blockNum := node.Blockchain().CurrentBlock().NumberU64() + node.Consensus.SetMode(consensus.Listening) + epoch := core.ShardingSchedule.CalcEpochNumber(blockNum) + utils.Logger().Info(). + Uint64("blockNum", blockNum). + Uint32("shardID", shardID). + Uint64("epoch", epoch.Uint64()). + Msg("[InitShardState] Try To Get PublicKeys from database") + pubKeys := core.GetPublicKeys(epoch, shardID) + if len(pubKeys) == 0 { + return ctxerror.New( + "[InitShardState] PublicKeys is Empty, Cannot update public keys", + "shardID", shardID, + "blockNum", blockNum) } - getLogger().Info("Successfully loaded epoch shard state") - // Update validator public keys - committee := shardState.FindCommitteeByID(shardID) - if committee == nil { - return ctxerror.New("our shard is not found in genesis shard state", - "shardID", shardID) - } - pubKeys := []*bls.PublicKey{} - for _, node := range committee.NodeList { - pubKey := &bls.PublicKey{} - pubKeyBytes := node.BlsPublicKey[:] - err = pubKey.Deserialize(pubKeyBytes) - if err != nil { - return ctxerror.New("cannot deserialize BLS public key", - "shardID", shardID, - "pubKeyBytes", hex.EncodeToString(pubKeyBytes), - ).WithCause(err) + for _, key := range pubKeys { + if key.IsEqual(node.Consensus.PubKey) { + utils.Logger().Info(). + Uint64("blockNum", blockNum). + Int("numPubKeys", len(pubKeys)). + Msg("[InitShardState] Successfully updated public keys") + node.Consensus.UpdatePublicKeys(pubKeys) + node.Consensus.SetMode(consensus.Normal) + return nil } - pubKeys = append(pubKeys, pubKey) } - getLogger().Info("initialized shard state", "numPubKeys", len(pubKeys)) - node.Consensus.UpdatePublicKeys(pubKeys) // TODO: Disable drand. Currently drand isn't functioning but we want to compeletely turn it off for full protection. // node.DRand.UpdatePublicKeys(pubKeys) return nil @@ -475,7 +466,6 @@ func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) { chanPeer := make(chan p2p.Peer) nodeConfig := service.NodeConfig{ - IsBeacon: node.NodeConfig.IsBeacon(), IsClient: node.NodeConfig.IsClient(), Beacon: p2p.GroupIDBeacon, ShardGroupID: node.NodeConfig.GetShardGroupID(), diff --git a/node/node_handler.go b/node/node_handler.go index 4b3aa7208..d2113f7aa 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -29,7 +29,6 @@ import ( "github.com/harmony-one/harmony/contracts/structs" "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core/types" - common2 "github.com/harmony-one/harmony/internal/common" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/internal/ctxerror" "github.com/harmony-one/harmony/internal/utils" @@ -148,10 +147,9 @@ func (node *Node) messageHandler(content []byte, sender libp2p_peer.ID) { utils.Logger().Debug().Msg("NET: Received staking message") msgPayload, _ := proto.GetStakingMessagePayload(content) // Only beacon leader processes staking txn - if node.NodeConfig.Role() != nodeconfig.BeaconLeader { - return + if node.Consensus != nil && node.Consensus.ShardID == 0 && node.Consensus.IsLeader() { + node.processStakingMessage(msgPayload) } - node.processStakingMessage(msgPayload) case proto.Node: actionType := proto_node.MessageType(msgType) switch actionType { @@ -173,10 +171,11 @@ func (node *Node) messageHandler(content []byte, sender libp2p_peer.ID) { } else { // for non-beaconchain node, subscribe to beacon block broadcast role := node.NodeConfig.Role() - if proto_node.BlockMessageType(msgPayload[0]) == proto_node.Sync && (role == nodeconfig.ShardValidator || role == nodeconfig.ShardLeader || role == nodeconfig.NewNode) { + if proto_node.BlockMessageType(msgPayload[0]) == proto_node.Sync && role == nodeconfig.Validator { utils.Logger().Info(). Uint64("block", blocks[0].NumberU64()). Msg("Block being handled by block channel") + for _, block := range blocks { node.BeaconBlockChannel <- block } @@ -590,16 +589,6 @@ func (node *Node) pingMessageHandler(msgPayload []byte, sender libp2p_peer.ID) i err = ctxerror.New("cannot convert BLS public key").WithCause(err) ctxerror.Log15(utils.GetLogger().Warn, err) } - if genesisNode := getGenesisNodeByConsensusKey(k); genesisNode != nil { - utils.Logger().Info(). - Uint32("genesisShardID", genesisNode.ShardID). - Int("genesisMemberIndex", genesisNode.MemberIndex). - Str("genesisStakingAccount", common2.MustAddressToBech32(genesisNode.NodeID.EcdsaAddress)) - } else { - utils.Logger().Info(). - Str("BlsPubKey", peer.ConsensusPubKey.SerializeToHexStr()). - Msg("cannot find genesis node") - } utils.Logger().Info(). Str("Peer Version", ping.NodeVer). Interface("PeerID", peer). @@ -766,7 +755,7 @@ func (node *Node) epochShardStateMessageHandler(msgPayload []byte) error { if err != nil { return ctxerror.New("Can't get shard state message").WithCause(err) } - if node.Consensus == nil && node.NodeConfig.Role() != nodeconfig.NewNode { + if node.Consensus == nil { return nil } receivedEpoch := big.NewInt(int64(epochShardState.Epoch)) diff --git a/node/node_newblock.go b/node/node_newblock.go index 797dea044..74135dfff 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -15,8 +15,7 @@ import ( // Constants of lower bound limit of a new block. const ( - ConsensusTimeOut = 20 - PeriodicBlock = 200 * time.Millisecond + PeriodicBlock = 200 * time.Millisecond ) // WaitForConsensusReadyv2 listen for the readiness signal from consensus and generate new block for consensus. @@ -31,7 +30,6 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch Msg("Waiting for Consensus ready") time.Sleep(30 * time.Second) // Wait for other nodes to be ready (test-only) - timeoutCount := 0 var newBlock *types.Block // Set up the very first deadline. @@ -43,18 +41,6 @@ func (node *Node) WaitForConsensusReadyv2(readySignal chan struct{}, stopChan ch utils.Logger().Debug(). Msg("Consensus new block proposal: STOPPED!") return - case <-time.After(ConsensusTimeOut * time.Second): - if node.Consensus.PubKey.IsEqual(node.Consensus.LeaderPubKey) { - utils.Logger().Debug(). - Int("count", timeoutCount). - Msg("Leader consensus timeout, retry!") - //node.Consensus.ResetState() - timeoutCount++ - if newBlock != nil { - // Send the new block to Consensus so it can be confirmed. - node.BlockChannel <- newBlock - } - } case <-readySignal: for { time.Sleep(PeriodicBlock) diff --git a/node/node_syncing.go b/node/node_syncing.go index e7fc8c23d..5ca9e318d 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -28,7 +28,7 @@ const ( lastMileThreshold = 4 inSyncThreshold = 1 // unit in number of block SyncFrequency = 10 // unit in second - MinConnectedPeers = 5 // minimum number of peers connected to in node syncing + MinConnectedPeers = 10 // minimum number of peers connected to in node syncing ) // getNeighborPeers is a helper function to return list of peers @@ -228,7 +228,7 @@ func (node *Node) SendNewBlockToUnsync() { } // CalculateResponse implements DownloadInterface on Node object. -func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (*downloader_pb.DownloaderResponse, error) { +func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest, incomingPeer string) (*downloader_pb.DownloaderResponse, error) { response := &downloader_pb.DownloaderResponse{} switch request.Type { case downloader_pb.DownloaderRequest_HEADER: @@ -248,7 +248,7 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (* startHeight := startBlock.NumberU64() endHeight := node.Blockchain().CurrentBlock().NumberU64() if startHeight >= endHeight { - utils.GetLogInstance().Debug("[SYNC] GetBlockHashes Request: I am not higher than requested node", "myHeight", endHeight, "requestHeight", startHeight) + utils.GetLogInstance().Debug("[SYNC] GetBlockHashes Request: I am not higher than requested node", "myHeight", endHeight, "requestHeight", startHeight, "incomingIP", request.Ip, "incomingPort", request.Port, "incomingPeer", incomingPeer) return response, nil } diff --git a/node/service_setup.go b/node/service_setup.go index 9f1b0d0d6..763aa7ef6 100644 --- a/node/service_setup.go +++ b/node/service_setup.go @@ -15,7 +15,7 @@ import ( "github.com/harmony-one/harmony/p2p" ) -func (node *Node) setupForShardLeader() { +func (node *Node) setupForValidator() { nodeConfig, chanPeer := node.initNodeConfiguration() // Register peer discovery service. No need to do staking for beacon chain node. @@ -28,56 +28,13 @@ func (node *Node) setupForShardLeader() { node.serviceManager.RegisterService(service.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReadyv2)) // Register client support service. node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.Blockchain().State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) -} - -func (node *Node) setupForShardValidator() { - nodeConfig, chanPeer := node.initNodeConfiguration() - // Register client support service. - node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.Blockchain().State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) - // Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node. - node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer)) - // Register networkinfo service. "0" is the beacon shard ID - node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil)) - // Register consensus service. - node.serviceManager.RegisterService(service.Consensus, consensus.New(node.BlockChannel, node.Consensus, node.startConsensus)) - // Register new block service. - node.serviceManager.RegisterService(service.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReadyv2)) - -} - -func (node *Node) setupForBeaconLeader() { - nodeConfig, chanPeer := node.initNodeConfiguration() - - // Register peer discovery service. No need to do staking for beacon chain node. - node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil)) - // Register networkinfo service. - node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil)) - // Register client support service. - node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.Blockchain().State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) - // Register consensus service. - node.serviceManager.RegisterService(service.Consensus, consensus.New(node.BlockChannel, node.Consensus, node.startConsensus)) - // Register new block service. - node.serviceManager.RegisterService(service.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReadyv2)) // Register randomness service // TODO: Disable drand. Currently drand isn't functioning but we want to compeletely turn it off for full protection. // Enable it back after mainnet. + // Need Dynamically enable for beacon validators // node.serviceManager.RegisterService(service.Randomness, randomness.New(node.DRand)) -} - -func (node *Node) setupForBeaconValidator() { - nodeConfig, chanPeer := node.initNodeConfiguration() - // Register peer discovery service. No need to do staking for beacon chain node. - node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil)) - // Register networkinfo service. - node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil)) - // Register consensus service. - node.serviceManager.RegisterService(service.Consensus, consensus.New(node.BlockChannel, node.Consensus, node.startConsensus)) - // Register new block service. - node.serviceManager.RegisterService(service.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReadyv2)) - // Register client support service. - node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.Blockchain().State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) } func (node *Node) setupForNewNode() { @@ -126,16 +83,8 @@ func (node *Node) ServiceManagerSetup() { node.serviceManager = &service.Manager{} node.serviceMessageChan = make(map[service.Type]chan *msg_pb.Message) switch node.NodeConfig.Role() { - case nodeconfig.ShardLeader: - node.setupForShardLeader() - case nodeconfig.ShardValidator: - node.setupForShardValidator() - case nodeconfig.BeaconLeader: - node.setupForBeaconLeader() - case nodeconfig.BeaconValidator: - node.setupForBeaconValidator() - case nodeconfig.NewNode: - node.setupForNewNode() + case nodeconfig.Validator: + node.setupForValidator() case nodeconfig.ClientNode: node.setupForClientNode() case nodeconfig.ExplorerNode: diff --git a/test/configs/local-resharding.txt b/test/configs/local-resharding.txt index 1539105f5..2a4d3199c 100644 --- a/test/configs/local-resharding.txt +++ b/test/configs/local-resharding.txt @@ -13,13 +13,13 @@ 127.0.0.1 9012 validator one103q7qe5t2505lypvltkqtddaef5tzfxwsse4z7 678ec9670899bf6af85b877058bea4fc1301a5a3a376987e826e3ca150b80e3eaadffedad0fedfa111576fa76ded980c 127.0.0.1 9013 validator one129r9pj3sk0re76f7zs3qz92rggmdgjhtwge62k 63f479f249c59f0486fda8caa2ffb247209489dae009dfde6144ff38c370230963d360dffd318cfb26c213320e89a512 -127.0.0.1 9100 newnode one1ghkz3frhske7emk79p7v2afmj4a5t0kmjyt4s5 eca09c1808b729ca56f1b5a6a287c6e1c3ae09e29ccf7efa35453471fcab07d9f73cee249e2b91f5ee44eb9618be3904 -127.0.0.1 9101 newnode one1d7jfnr6yraxnrycgaemyktkmhmajhp8kl0yahv f47238daef97d60deedbde5302d05dea5de67608f11f406576e363661f7dcbc4a1385948549b31a6c70f6fde8a391486 -127.0.0.1 9102 newnode one1r4zyyjqrulf935a479sgqlpa78kz7zlcg2jfen fc4b9c535ee91f015efff3f32fbb9d32cdd9bfc8a837bb3eee89b8fff653c7af2050a4e147ebe5c7233dc2d5df06ee0a -127.0.0.1 9103 newnode one1p7ht2d4kl8ve7a8jxw746yfnx4wnfxtp8jqxwe ca86e551ee42adaaa6477322d7db869d3e203c00d7b86c82ebee629ad79cb6d57b8f3db28336778ec2180e56a8e07296 -127.0.0.1 9104 newnode one1z05g55zamqzfw9qs432n33gycdmyvs38xjemyl 95117937cd8c09acd2dfae847d74041a67834ea88662a7cbed1e170350bc329e53db151e5a0ef3e712e35287ae954818 -127.0.0.1 9105 newnode one1ljznytjyn269azvszjlcqvpcj6hjm822yrcp2e 68ae289d73332872ec8d04ac256ca0f5453c88ad392730c5741b6055bc3ec3d086ab03637713a29f459177aaa8340615 -127.0.0.1 9107 newnode one1uyshu2jgv8w465yc8kkny36thlt2wvel89tcmg 1c1fb28d2de96e82c3d9b4917eb54412517e2763112a3164862a6ed627ac62e87ce274bb4ea36e6a61fb66a15c263a06 -127.0.0.1 9108 newnode one103q7qe5t2505lypvltkqtddaef5tzfxwsse4z7 b179c4fdc0bee7bd0b6698b792837dd13404d3f985b59d4a9b1cd0641a76651e271518b61abbb6fbebd4acf963358604 -127.0.0.1 9109 newnode one1658znfwf40epvy7e46cqrmzyy54h4n0qa73nep 576d3c48294e00d6be4a22b07b66a870ddee03052fe48a5abbd180222e5d5a1f8946a78d55b025de21635fd743bbad90 -127.0.0.1 9110 newnode one1d2rngmem4x2c6zxsjjz29dlah0jzkr0k2n88wc 16513c487a6bb76f37219f3c2927a4f281f9dd3fd6ed2e3a64e500de6545cf391dd973cc228d24f9bd01efe94912e714 +127.0.0.1 9100 validator one1ghkz3frhske7emk79p7v2afmj4a5t0kmjyt4s5 eca09c1808b729ca56f1b5a6a287c6e1c3ae09e29ccf7efa35453471fcab07d9f73cee249e2b91f5ee44eb9618be3904 +127.0.0.1 9101 validator one1d7jfnr6yraxnrycgaemyktkmhmajhp8kl0yahv f47238daef97d60deedbde5302d05dea5de67608f11f406576e363661f7dcbc4a1385948549b31a6c70f6fde8a391486 +127.0.0.1 9102 validator one1r4zyyjqrulf935a479sgqlpa78kz7zlcg2jfen fc4b9c535ee91f015efff3f32fbb9d32cdd9bfc8a837bb3eee89b8fff653c7af2050a4e147ebe5c7233dc2d5df06ee0a +127.0.0.1 9103 validator one1p7ht2d4kl8ve7a8jxw746yfnx4wnfxtp8jqxwe ca86e551ee42adaaa6477322d7db869d3e203c00d7b86c82ebee629ad79cb6d57b8f3db28336778ec2180e56a8e07296 +127.0.0.1 9104 validator one1z05g55zamqzfw9qs432n33gycdmyvs38xjemyl 95117937cd8c09acd2dfae847d74041a67834ea88662a7cbed1e170350bc329e53db151e5a0ef3e712e35287ae954818 +127.0.0.1 9105 validator one1ljznytjyn269azvszjlcqvpcj6hjm822yrcp2e 68ae289d73332872ec8d04ac256ca0f5453c88ad392730c5741b6055bc3ec3d086ab03637713a29f459177aaa8340615 +127.0.0.1 9107 validator one1uyshu2jgv8w465yc8kkny36thlt2wvel89tcmg 1c1fb28d2de96e82c3d9b4917eb54412517e2763112a3164862a6ed627ac62e87ce274bb4ea36e6a61fb66a15c263a06 +127.0.0.1 9108 validator one103q7qe5t2505lypvltkqtddaef5tzfxwsse4z7 b179c4fdc0bee7bd0b6698b792837dd13404d3f985b59d4a9b1cd0641a76651e271518b61abbb6fbebd4acf963358604 +127.0.0.1 9109 validator one1658znfwf40epvy7e46cqrmzyy54h4n0qa73nep 576d3c48294e00d6be4a22b07b66a870ddee03052fe48a5abbd180222e5d5a1f8946a78d55b025de21635fd743bbad90 +127.0.0.1 9110 validator one1d2rngmem4x2c6zxsjjz29dlah0jzkr0k2n88wc 16513c487a6bb76f37219f3c2927a4f281f9dd3fd6ed2e3a64e500de6545cf391dd973cc228d24f9bd01efe94912e714 diff --git a/test/deploy.sh b/test/deploy.sh index c915fc14b..da27224d9 100755 --- a/test/deploy.sh +++ b/test/deploy.sh @@ -177,13 +177,6 @@ while IFS='' read -r line || [[ -n "$line" ]]; do case "${mode}" in *archival|archival) args=("${args[@]}" -is_archival);; esac case "${mode}" in explorer*) args=("${args[@]}" -is_genesis=false -is_explorer=true -shard_id=0);; esac case "${mode}" in - newnode) - sleep "${NUM_NN}" - NUM_NN=$((${NUM_NN} + 1)) - args=("${args[@]}" -is_newnode) - ;; - esac - case "${mode}" in client) ;; *) $DRYRUN "${ROOT}/bin/harmony" "${args[@]}" "${extra_args[@]}" 2>&1 | tee -a "${LOG_FILE}" &;; esac