From f00c90e3e62340aaaa11f38cb0a8417917ae6ede Mon Sep 17 00:00:00 2001 From: Jacky Wang Date: Mon, 15 Feb 2021 14:26:38 -0800 Subject: [PATCH] Graceful shutdown - All registered services are shutdown gracefully. (#3533) 1. Refactored service manager with cleaner interface. 2. Add prometheus to the service manager. 3. Graceful shutdown of the services (including consensus). 4. Some code refactor regarding consensus graceful shutdown. Co-authored-by: Rongjian Lan --- api/service/blockproposal/service.go | 28 +--- api/service/consensus/service.go | 18 +-- api/service/explorer/service.go | 10 +- api/service/manager.go | 187 ++++++++++------------- api/service/manager_test.go | 168 ++++++++++++++++++--- api/service/networkinfo/service.go | 22 +-- api/service/networkinfo/service_test.go | 4 +- api/service/prometheus/service.go | 71 +++++---- cmd/harmony/main.go | 106 +++++++------ consensus/consensus_v2.go | 17 +++ node/api.go | 12 -- node/node.go | 189 ++++++++++++++++-------- node/node_newblock.go | 6 +- node/service_setup.go | 51 +++---- p2p/host.go | 13 ++ rosetta/rosetta.go | 3 + 16 files changed, 536 insertions(+), 369 deletions(-) diff --git a/api/service/blockproposal/service.go b/api/service/blockproposal/service.go index cfb4efa04..925312e79 100644 --- a/api/service/blockproposal/service.go +++ b/api/service/blockproposal/service.go @@ -22,38 +22,26 @@ func New(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, wa return &Service{readySignal: readySignal, commitSigsChan: commitSigsChan, waitForConsensusReady: waitForConsensusReady} } -// StartService starts block proposal service. -func (s *Service) StartService() { +// Start starts block proposal service. +func (s *Service) Start() error { s.stopChan = make(chan struct{}) s.stoppedChan = make(chan struct{}) - s.Init() - s.Run(s.stopChan, s.stoppedChan) -} - -// Init initializes block proposal service. -func (s *Service) Init() { + s.run(s.stopChan, s.stoppedChan) + return nil } -// Run runs block proposal. -func (s *Service) Run(stopChan chan struct{}, stoppedChan chan struct{}) { +func (s *Service) run(stopChan chan struct{}, stoppedChan chan struct{}) { s.waitForConsensusReady(s.readySignal, s.commitSigsChan, s.stopChan, s.stoppedChan) } -// StopService stops block proposal service. -func (s *Service) StopService() { +// Stop stops block proposal service. +func (s *Service) Stop() error { utils.Logger().Info().Msg("Stopping block proposal service.") s.stopChan <- struct{}{} <-s.stoppedChan utils.Logger().Info().Msg("Role conversion stopped.") -} - -// NotifyService notify service -func (s *Service) NotifyService(params map[string]interface{}) {} - -// SetMessageChan sets up message channel to service. -func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { - s.messageChan = messageChan + return nil } // APIs for the services. diff --git a/api/service/consensus/service.go b/api/service/consensus/service.go index 483fc4310..144979da6 100644 --- a/api/service/consensus/service.go +++ b/api/service/consensus/service.go @@ -23,29 +23,23 @@ func New(blockChannel chan *types.Block, consensus *consensus.Consensus, startCh return &Service{blockChannel: blockChannel, consensus: consensus, startChan: startChan} } -// StartService starts consensus service. -func (s *Service) StartService() { +// Start starts consensus service. +func (s *Service) Start() error { utils.Logger().Info().Msg("[consensus/service] Starting consensus service.") s.stopChan = make(chan struct{}) s.stoppedChan = make(chan struct{}) s.consensus.Start(s.blockChannel, s.stopChan, s.stoppedChan, s.startChan) s.consensus.WaitForNewRandomness() + return nil } -// StopService stops consensus service. -func (s *Service) StopService() { +// Stop stops consensus service. +func (s *Service) Stop() error { utils.Logger().Info().Msg("Stopping consensus service.") s.stopChan <- struct{}{} <-s.stoppedChan utils.Logger().Info().Msg("Consensus service stopped.") -} - -// NotifyService notify service -func (s *Service) NotifyService(params map[string]interface{}) {} - -// SetMessageChan sets up message channel to service. -func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { - s.messageChan = messageChan + return s.consensus.Close() } // APIs for the services. diff --git a/api/service/explorer/service.go b/api/service/explorer/service.go index eaf78c288..2f554d639 100644 --- a/api/service/explorer/service.go +++ b/api/service/explorer/service.go @@ -54,21 +54,23 @@ func New(selfPeer *p2p.Peer, ss *syncing.StateSync, bc *core.BlockChain) *Servic return &Service{IP: selfPeer.IP, Port: selfPeer.Port, stateSync: ss, blockchain: bc} } -// StartService starts explorer service. -func (s *Service) StartService() { +// Start starts explorer service. +func (s *Service) Start() error { utils.Logger().Info().Msg("Starting explorer service.") s.Init() s.server = s.Run() + return nil } -// StopService shutdowns explorer service. -func (s *Service) StopService() { +// Stop shutdowns explorer service. +func (s *Service) Stop() error { utils.Logger().Info().Msg("Shutting down explorer service.") if err := s.server.Shutdown(context.Background()); err != nil { utils.Logger().Error().Err(err).Msg("Error when shutting down explorer server") } else { utils.Logger().Info().Msg("Shutting down explorer server successfully") } + return nil } // GetExplorerPort returns the port serving explorer dashboard. This port is explorerPortDifference less than the node port. diff --git a/api/service/manager.go b/api/service/manager.go index b552dfc0d..baaaba29b 100644 --- a/api/service/manager.go +++ b/api/service/manager.go @@ -1,19 +1,12 @@ package service import ( + "fmt" + "github.com/ethereum/go-ethereum/rpc" - msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/internal/utils" -) - -// ActionType is the input for Service Manager to operate. -type ActionType byte - -// Constants for Action Type. -const ( - Start ActionType = iota - Stop - Notify + "github.com/pkg/errors" + "github.com/rs/zerolog" ) // Type is service type. @@ -21,11 +14,13 @@ type Type byte // Constants for Type. const ( - ClientSupport Type = iota + UnknownService Type = iota + ClientSupport SupportExplorer Consensus BlockProposal NetworkInfo + Prometheus ) func (t Type) String() string { @@ -40,139 +35,111 @@ func (t Type) String() string { return "BlockProposal" case NetworkInfo: return "NetworkInfo" + case Prometheus: + return "Prometheus" default: return "Unknown" } } -// Action is type of service action. -type Action struct { - Action ActionType - ServiceType Type - Params map[string]interface{} -} - -// Interface is the collection of functions any service needs to implement. -type Interface interface { - StartService() - SetMessageChan(msgChan chan *msg_pb.Message) - StopService() - NotifyService(map[string]interface{}) - - // APIs retrieves the list of RPC descriptors the service provides - APIs() []rpc.API +// Service is the collection of functions any service needs to implement. +type Service interface { + Start() error + Stop() error + APIs() []rpc.API // the list of RPC descriptors the service provides } // Manager stores all services for service manager. type Manager struct { - services map[Type]Interface - actionChannel chan *Action + services []Service + serviceMap map[Type]Service + + logger zerolog.Logger } -// GetServices returns all registered services. -func (m *Manager) GetServices() map[Type]Interface { - return m.services +// NewManager creates a new manager +func NewManager() *Manager { + return &Manager{ + services: nil, + serviceMap: make(map[Type]Service), + logger: *utils.Logger(), + } } // Register registers new service to service store. -func (m *Manager) Register(t Type, service Interface) { +func (m *Manager) Register(t Type, service Service) { utils.Logger().Info().Int("service", int(t)).Msg("Register Service") - if m.services == nil { - m.services = make(map[Type]Interface) - } - if _, ok := m.services[t]; ok { - utils.Logger().Error().Int("servie", int(t)).Msg("This service is already included") + if _, ok := m.serviceMap[t]; ok { + utils.Logger().Error().Int("service", int(t)).Msg("This service is already included") return } - m.services[t] = service -} - -// SetupServiceManager inits service map and start service manager. -func (m *Manager) SetupServiceManager() { - m.InitServiceMap() - m.actionChannel = m.StartServiceManager() + m.services = append(m.services, service) + m.serviceMap[t] = service } -// RegisterService is used for testing. -func (m *Manager) RegisterService(t Type, service Interface) { - m.Register(t, service) +// GetServices returns all registered services. +func (m *Manager) GetServices() []Service { + return m.services } -// InitServiceMap initializes service map. -func (m *Manager) InitServiceMap() { - m.services = make(map[Type]Interface) -} +// StartServices run all registered services. If one of the starting service returns +// an error, closing all started services. +func (m *Manager) StartServices() (err error) { + started := make([]Service, 0, len(m.services)) -// TakeAction is how service manager handles the action. -func (m *Manager) TakeAction(action *Action) { - if m.services == nil { - utils.Logger().Error().Msg("Service store is not initialized") - return - } - if service, ok := m.services[action.ServiceType]; ok { - switch action.Action { - case Start: - service.StartService() - case Stop: - service.StopService() - case Notify: - service.NotifyService(action.Params) - } - } -} - -// StartServiceManager starts service manager. -func (m *Manager) StartServiceManager() chan *Action { - ch := make(chan *Action) - go func() { - for { - select { - case action := <-ch: - m.TakeAction(action) + defer func() { + if err != nil { + // If error is not nil, closing all services in reverse order + if stopErr := m.stopServices(started); stopErr != nil { + err = fmt.Errorf("%v; %v", err, stopErr) } } }() - return ch -} -// RunServices run registered services. -func (m *Manager) RunServices() { - for serviceType := range m.services { - action := &Action{ - Action: Start, - ServiceType: serviceType, + for _, service := range m.services { + t := m.typeByService(service) + m.logger.Info().Str("type", t.String()).Msg("Starting service") + if err = service.Start(); err != nil { + err = errors.Wrapf(err, "cannot start service [%v]", t.String()) + return err } - m.TakeAction(action) + started = append(started, service) } + return err } -// SetupServiceMessageChan sets up message channel to services. -func (m *Manager) SetupServiceMessageChan( - mapServiceTypeChan map[Type]chan *msg_pb.Message, -) { - for serviceType, service := range m.services { - mapServiceTypeChan[serviceType] = make(chan *msg_pb.Message) - service.SetMessageChan(mapServiceTypeChan[serviceType]) - } +// StopServices stops all services in the reverse order. +func (m *Manager) StopServices() error { + return m.stopServices(m.services) } -// StopService stops service with type t. -func (m *Manager) StopService(t Type) { - if service, ok := m.services[t]; ok { - service.StopService() +// stopServices stops given services in the reverse order. +func (m *Manager) stopServices(services []Service) error { + size := len(services) + var rErr error + + for i := size - 1; i >= 0; i-- { + service := services[i] + t := m.typeByService(service) + + m.logger.Info().Str("type", t.String()).Msg("Stopping service") + if err := service.Stop(); err != nil { + err = errors.Wrapf(err, "failed to stop service [%v]", t.String()) + if rErr != nil { + rErr = fmt.Errorf("%v; %v", rErr, err) + } else { + rErr = err + } + } } + return rErr } -// StopServicesByRole stops all service of the given role. -func (m *Manager) StopServicesByRole(liveServices []Type) { - marked := make(map[Type]bool) - for _, s := range liveServices { - marked[s] = true - } - - for t := range m.GetServices() { - if _, ok := marked[t]; !ok { - m.StopService(t) +func (m *Manager) typeByService(target Service) Type { + for t, s := range m.serviceMap { + if s == target { + return t } } + return UnknownService } diff --git a/api/service/manager_test.go b/api/service/manager_test.go index bb2f95986..3b964b3f3 100644 --- a/api/service/manager_test.go +++ b/api/service/manager_test.go @@ -1,30 +1,162 @@ package service import ( + "errors" + "fmt" + "strings" "testing" - msg_pb "github.com/harmony-one/harmony/api/proto/message" - nodeconfig "github.com/harmony-one/harmony/internal/configs/node" + "github.com/ethereum/go-ethereum/rpc" ) -func TestMessageChan(t *testing.T) { - m := &Manager{} - m.SetupServiceManager() - msgChans := make(map[Type]chan *msg_pb.Message) - m.SetupServiceMessageChan(msgChans) +func TestManager_StartServices(t *testing.T) { + tests := []struct { + services []Service + stopped bool + err error + }{ + { + services: []Service{ + makeTestService(0, nil, nil), + makeTestService(1, nil, nil), + makeTestService(2, nil, nil), + }, + stopped: false, + err: nil, + }, + { + services: []Service{ + makeTestService(0, func() error { return errors.New("start error") }, nil), + }, + stopped: true, + err: errors.New("cannot start service [Unknown]: start error"), + }, + { + services: []Service{ + makeTestService(0, nil, nil), + makeTestService(1, nil, nil), + makeTestService(2, func() error { return errors.New("start error") }, nil), + }, + stopped: true, + err: errors.New("cannot start service [Unknown]: start error"), + }, + { + services: []Service{ + makeTestService(0, nil, nil), + makeTestService(1, nil, func() error { return errors.New("stop error") }), + makeTestService(2, func() error { return errors.New("start error") }, nil), + }, + stopped: true, + err: errors.New("cannot start service [Unknown]: start error; failed to stop service [Unknown]: stop error"), + }, + } + for i, test := range tests { + m := &Manager{ + services: test.services, + } + err := m.StartServices() + if assErr := assertError(err, test.err); assErr != nil { + t.Errorf("Test %v: unexpected error: %v", i, assErr) + } + for _, s := range test.services { + ts := s.(*testService) + if ts.started == test.stopped { + t.Errorf("Test %v: [service %v] test status unexpected", i, ts.index) + } + } + } +} + +func TestManager_StopServices(t *testing.T) { + tests := []struct { + services []Service + expErr error + }{ + { + services: []Service{ + makeTestService(0, nil, nil), + makeTestService(1, nil, nil), + makeTestService(2, nil, nil), + }, + expErr: nil, + }, + { + services: []Service{ + makeTestService(0, nil, nil), + makeTestService(1, nil, func() error { return errors.New("expect error") }), + makeTestService(2, nil, func() error { return errors.New("expect error") }), + }, + expErr: errors.New("failed to stop service [Unknown]: expect error; failed to stop service [Unknown]: expect error"), + }, + } + + for i, test := range tests { + m := &Manager{ + services: test.services, + } + err := m.StopServices() + if assErr := assertError(err, test.expErr); assErr != nil { + t.Errorf("Test %v: %v", i, assErr) + } + for _, s := range test.services { + ts := s.(*testService) + if ts.started { + t.Errorf("Test %v: Service%v not stopped", i, ts.index) + } + } + } + +} + +type testService struct { + index int + started bool + startErrHook func() error + stopErrHook func() error +} + +func makeTestService(index int, startErrHook, stopErrHook func() error) *testService { + return &testService{ + index: index, + startErrHook: startErrHook, + stopErrHook: stopErrHook, + } +} + +func (s *testService) Start() error { + if s.startErrHook != nil { + if err := s.startErrHook(); err != nil { + return err + } + } + s.started = true + return nil } -func TestInit(t *testing.T) { - if GroupIDShards[nodeconfig.ShardID(0)] != nodeconfig.NewGroupIDByShardID(0) { - t.Errorf("GroupIDShards[0]: %v != GroupIDBeacon: %v", - GroupIDShards[nodeconfig.ShardID(0)], - nodeconfig.NewGroupIDByShardID(0), - ) +func (s *testService) Stop() error { + if s.stopErrHook != nil { + if err := s.stopErrHook(); err != nil { + s.started = false + return err + } + } + s.started = false + return nil +} + +func (s *testService) APIs() []rpc.API { + return nil +} + +func assertError(got, expect error) error { + if (got == nil) != (expect == nil) { + return fmt.Errorf("unexpected error [%v] / [%v]", got, expect) + } + if (got == nil) || (expect == nil) { + return nil } - if len(GroupIDShards) != nodeconfig.MaxShards { - t.Errorf("len(GroupIDShards): %v != TotalShards: %v", - len(GroupIDShards), - nodeconfig.MaxShards, - ) + if !strings.Contains(got.Error(), expect.Error()) { + return fmt.Errorf("unexpected error [%v] / [%v]", got, expect) } + return nil } diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index 5d8297e81..5a7f411e9 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -109,15 +109,16 @@ func MustNew( return service } -// StartService starts network info service. -func (s *Service) StartService() { +// Start starts network info service. +func (s *Service) Start() error { err := s.Init() if err != nil { utils.Logger().Error().Err(err).Msg("Service Init Failed") - return + return nil } s.Run() s.started = true + return nil } // Init initializes role conversion service. @@ -277,27 +278,20 @@ func (s *Service) findPeers(ctx context.Context) { utils.Logger().Info().Msg("PeerInfo Channel Closed") } -// StopService stops network info service. -func (s *Service) StopService() { +// Stop stops network info service. +func (s *Service) Stop() error { utils.Logger().Info().Msg("Stopping network info service") defer s.cancel() if !s.started { utils.Logger().Info().Msg("Service didn't started. Exit") - return + return nil } s.stopChan <- struct{}{} <-s.stoppedChan utils.Logger().Info().Msg("Network info service stopped") -} - -// NotifyService notify service -func (s *Service) NotifyService(params map[string]interface{}) {} - -// SetMessageChan sets up message channel to service. -func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) { - s.messageChan = messageChan + return nil } // APIs for the services. diff --git a/api/service/networkinfo/service_test.go b/api/service/networkinfo/service_test.go index 0ce342310..08232a30c 100644 --- a/api/service/networkinfo/service_test.go +++ b/api/service/networkinfo/service_test.go @@ -31,7 +31,7 @@ func TestService(t *testing.T) { t.Fatalf("New() failed: %s", err) } - s.StartService() + s.Start() time.Sleep(2 * time.Second) - s.StopService() + s.Stop() } diff --git a/api/service/prometheus/service.go b/api/service/prometheus/service.go index d5861c812..f36b2ba54 100644 --- a/api/service/prometheus/service.go +++ b/api/service/prometheus/service.go @@ -11,6 +11,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/rpc" "github.com/harmony-one/harmony/internal/utils" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -43,6 +44,8 @@ type Service struct { pusher *push.Pusher failStatus error config Config + + registryOnce sync.Once } // Handler represents a path and handler func to serve on the same port as /metrics, /healthz, /goroutinez, etc. @@ -52,8 +55,8 @@ type Handler struct { } var ( - registryOnce sync.Once - svc = &Service{} + initOnce sync.Once + svc = &Service{} ) func (s *Service) getJobName() string { @@ -76,11 +79,18 @@ func (s *Service) getJobName() string { } // NewService sets up a new instance for a given address host:port. -// An empty host will match with any IP so an address like ":19000" is perfectly acceptable. -func NewService(cfg Config, additionalHandlers ...Handler) { +// Anq empty host will match with any IP so an address like ":19000" is perfectly acceptable. +func NewService(cfg Config, additionalHandlers ...Handler) *Service { + initOnce.Do(func() { + svc = newService(cfg, additionalHandlers...) + }) + return svc +} + +func newService(cfg Config, additionalHandlers ...Handler) *Service { if !cfg.Enabled { utils.Logger().Info().Msg("Prometheus http server disabled...") - return + return nil } utils.Logger().Debug().Str("cfg", cfg.String()).Msg("Prometheus") @@ -107,26 +117,11 @@ func NewService(cfg Config, additionalHandlers ...Handler) { Msg("Starting Prometheus server") endpoint := fmt.Sprintf("%s:%d", svc.config.IP, svc.config.Port) svc.server = &http.Server{Addr: endpoint, Handler: mux} - svc.Start() -} - -// StopService stop the Prometheus service -func StopService() error { - return svc.Stop() + return svc } -func (s *Service) goroutinezHandler(w http.ResponseWriter, _ *http.Request) { - stack := debug.Stack() - if _, err := w.Write(stack); err != nil { - utils.Logger().Error().Err(err).Msg("Failed to write goroutines stack") - } - if err := pprof.Lookup("goroutine").WriteTo(w, 2); err != nil { - utils.Logger().Error().Err(err).Msg("Failed to write pprof goroutines") - } -} - -// Start the prometheus service. -func (s *Service) Start() { +// Start start the prometheus service +func (s *Service) Start() error { go func() { utils.Logger().Info().Str("address", s.server.Addr).Msg("Starting prometheus service") err := s.server.ListenAndServe() @@ -158,15 +153,26 @@ func (s *Service) Start() { } }(s) } + return nil } -// Stop the service gracefully. +// Stop stop the Prometheus service func (s *Service) Stop() error { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() return s.server.Shutdown(ctx) } +func (s *Service) goroutinezHandler(w http.ResponseWriter, _ *http.Request) { + stack := debug.Stack() + if _, err := w.Write(stack); err != nil { + utils.Logger().Error().Err(err).Msg("Failed to write goroutines stack") + } + if err := pprof.Lookup("goroutine").WriteTo(w, 2); err != nil { + utils.Logger().Error().Err(err).Msg("Failed to write pprof goroutines") + } +} + // Status checks for any service failure conditions. func (s *Service) Status() error { if s.failStatus != nil { @@ -175,12 +181,21 @@ func (s *Service) Status() error { return nil } -// PromRegistry return the registry of prometheus service -func PromRegistry() *prometheus.Registry { - registryOnce.Do(func() { +// APIs returns the RPC apis of the prometheus service +func (s *Service) APIs() []rpc.API { + return nil +} + +func (s *Service) getRegistry() *prometheus.Registry { + s.registryOnce.Do(func() { if svc.registry == nil { svc.registry = prometheus.NewRegistry() } }) - return svc.registry + return s.registry +} + +// PromRegistry return the registry of prometheus service +func PromRegistry() *prometheus.Registry { + return svc.getRegistry() } diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 3bab421df..0c862f306 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -300,34 +300,6 @@ func setupNodeAndRun(hc harmonyConfig) { utils.Logger().Warn().Err(err).Msg("Check Local Time Accuracy Error") } - // Prepare for graceful shutdown from os signals - osSignal := make(chan os.Signal) - signal.Notify(osSignal, os.Interrupt, syscall.SIGTERM) - go func(node *node.Node) { - for sig := range osSignal { - if sig == syscall.SIGTERM || sig == os.Interrupt { - utils.Logger().Warn().Str("signal", sig.String()).Msg("Gracefully shutting down...") - const msg = "Got %s signal. Gracefully shutting down...\n" - fmt.Fprintf(os.Stderr, msg, sig) - // stop block proposal service for leader - if node.Consensus.IsLeader() { - node.ServiceManager().StopService(service.BlockProposal) - } - if node.Consensus.Mode() == consensus.Normal { - phase := node.Consensus.GetConsensusPhase() - utils.Logger().Warn().Str("phase", phase).Msg("[shutdown] commit phase has to wait") - maxWait := time.Now().Add(2 * node.Consensus.BlockPeriod) // wait up to 2 * blockperiod in commit phase - for time.Now().Before(maxWait) && - node.Consensus.GetConsensusPhase() == "Commit" { - utils.Logger().Warn().Msg("[shutdown] wait for consensus finished") - time.Sleep(time.Millisecond * 100) - } - } - currentNode.ShutDown() - } - } - }(currentNode) - // Parse RPC config nodeConfig.RPCServer = nodeconfig.RPCServerConfig{ HTTPEnabled: hc.HTTP.Enabled, @@ -394,22 +366,22 @@ func setupNodeAndRun(hc harmonyConfig) { nodeconfig.SetPeerID(myHost.GetID()) - prometheusConfig := prometheus.Config{ - Enabled: hc.Prometheus.Enabled, - IP: hc.Prometheus.IP, - Port: hc.Prometheus.Port, - EnablePush: hc.Prometheus.EnablePush, - Gateway: hc.Prometheus.Gateway, - Network: hc.Network.NetworkType, - Legacy: hc.General.NoStaking, - NodeType: hc.General.NodeType, - Shard: nodeConfig.ShardID, - Instance: myHost.GetID().Pretty(), + if currentNode.NodeConfig.Role() == nodeconfig.Validator { + currentNode.RegisterValidatorServices() + } else if currentNode.NodeConfig.Role() == nodeconfig.ExplorerNode { + currentNode.RegisterExplorerServices() + } + if hc.Prometheus.Enabled { + setupPrometheusService(currentNode, hc, nodeConfig.ShardID) } + // TODO: replace this legacy syncing currentNode.SupportSyncing() - currentNode.ServiceManagerSetup() - currentNode.RunServices() + + if err := currentNode.StartServices(); err != nil { + fmt.Fprint(os.Stderr, err.Error()) + os.Exit(-1) + } if err := currentNode.StartRPC(); err != nil { utils.Logger().Warn(). @@ -423,23 +395,21 @@ func setupNodeAndRun(hc harmonyConfig) { Msg("Start Rosetta failed") } - if err := currentNode.StartPrometheus(prometheusConfig); err != nil { - utils.Logger().Warn(). - Err(err). - Msg("Start Prometheus failed") - } + go listenOSSigAndShutDown(currentNode) if err := currentNode.BootstrapConsensus(); err != nil { - fmt.Println("could not bootstrap consensus", err.Error()) + fmt.Fprint(os.Stderr, "could not bootstrap consensus", err.Error()) if !currentNode.NodeConfig.IsOffline { os.Exit(-1) } } - if err := currentNode.Start(); err != nil { - fmt.Println("could not begin network message handling for node", err.Error()) + if err := currentNode.StartPubSub(); err != nil { + fmt.Fprint(os.Stderr, "could not begin network message handling for node", err.Error()) os.Exit(-1) } + + select {} } func nodeconfigSetShardSchedule(config harmonyConfig) { @@ -707,6 +677,23 @@ func setupConsensusAndNode(hc harmonyConfig, nodeConfig *nodeconfig.ConfigType) return currentNode } +func setupPrometheusService(node *node.Node, hc harmonyConfig, sid uint32) { + prometheusConfig := prometheus.Config{ + Enabled: hc.Prometheus.Enabled, + IP: hc.Prometheus.IP, + Port: hc.Prometheus.Port, + EnablePush: hc.Prometheus.EnablePush, + Gateway: hc.Prometheus.Gateway, + Network: hc.Network.NetworkType, + Legacy: hc.General.NoStaking, + NodeType: hc.General.NodeType, + Shard: sid, + Instance: myHost.GetID().Pretty(), + } + p := prometheus.NewService(prometheusConfig) + node.RegisterService(service.Prometheus, p) +} + func setupBlacklist(hc harmonyConfig) (map[ethCommon.Address]struct{}, error) { utils.Logger().Debug().Msgf("Using blacklist file at `%s`", hc.TxPool.BlacklistFile) dat, err := ioutil.ReadFile(hc.TxPool.BlacklistFile) @@ -726,3 +713,24 @@ func setupBlacklist(hc harmonyConfig) (map[ethCommon.Address]struct{}, error) { } return addrMap, nil } + +func listenOSSigAndShutDown(node *node.Node) { + // Prepare for graceful shutdown from os signals + osSignal := make(chan os.Signal) + signal.Notify(osSignal, syscall.SIGINT, syscall.SIGTERM) + sig := <-osSignal + utils.Logger().Warn().Str("signal", sig.String()).Msg("Gracefully shutting down...") + const msg = "Got %s signal. Gracefully shutting down...\n" + fmt.Fprintf(os.Stderr, msg, sig) + + go node.ShutDown() + + for i := 10; i > 0; i-- { + <-osSignal + if i > 1 { + fmt.Printf("Already shutting down, interrupt more to force quit: (times=%v)\n", i-1) + } + } + fmt.Println("Forced QUIT.") + os.Exit(-1) +} diff --git a/consensus/consensus_v2.go b/consensus/consensus_v2.go index 3036becb7..a4bd9024f 100644 --- a/consensus/consensus_v2.go +++ b/consensus/consensus_v2.go @@ -469,6 +469,23 @@ func (consensus *Consensus) Start( }() } +// Close close the consensus. If current is in normal commit phase, wait until the commit +// phase end. +func (consensus *Consensus) Close() error { + if consensus.Mode() != Normal || consensus.phase != FBFTCommit { + return nil + } + // We only need to wait consensus is in normal commit phase + utils.Logger().Warn().Str("phase", consensus.phase.String()).Msg("[shutdown] commit phase has to wait") + + maxWait := time.Now().Add(2 * consensus.BlockPeriod) + for time.Now().Before(maxWait) && consensus.GetConsensusPhase() == "Commit" { + utils.Logger().Warn().Msg("[shutdown] wait for consensus finished") + time.Sleep(time.Millisecond * 100) + } + return nil +} + // LastMileBlockIter is the iterator to iterate over the last mile blocks in consensus cache. // All blocks returned are guaranteed to pass the verification. type LastMileBlockIter struct { diff --git a/node/api.go b/node/api.go index 825b5a2cf..5c677bfa1 100644 --- a/node/api.go +++ b/node/api.go @@ -2,7 +2,6 @@ package node import ( "github.com/ethereum/go-ethereum/rpc" - "github.com/harmony-one/harmony/api/service/prometheus" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/hmy" "github.com/harmony-one/harmony/rosetta" @@ -82,17 +81,6 @@ func (node *Node) StopRPC() error { return hmy_rpc.StopServers() } -// StartPrometheus start promtheus metrics service -func (node *Node) StartPrometheus(cfg prometheus.Config) error { - prometheus.NewService(cfg) - return nil -} - -// StopPrometheus stop prometheus metrics service -func (node *Node) StopPrometheus() error { - return prometheus.StopService() -} - // StartRosetta start rosetta service func (node *Node) StartRosetta() error { harmony := hmy.New(node, node.TxPool, node.CxPool, node.Consensus.ShardID) diff --git a/node/node.go b/node/node.go index 8a4a204c2..2f12f30dc 100644 --- a/node/node.go +++ b/node/node.go @@ -106,7 +106,6 @@ type Node struct { // Chain configuration. chainConfig params.ChainConfig // map of service type to its message channel. - serviceMessageChan map[service.Type]chan *msg_pb.Message isFirstTime bool // the node was started with a fresh database unixTimeAtNodeStart int64 // KeysToAddrs holds the addresses of bls keys run by the node @@ -125,6 +124,10 @@ type Node struct { committeeCache *lru.Cache Metrics metrics.Registry + + // context control for pub-sub handling + psCtx context.Context + psCancel func() } // Blockchain returns the blockchain for the node's current shard. @@ -548,8 +551,10 @@ var ( errConsensusMessageOnUnexpectedTopic = errors.New("received consensus on wrong topic") ) -// Start kicks off the node message handling -func (node *Node) Start() error { +// StartPubSub kicks off the node message handling +func (node *Node) StartPubSub() error { + node.psCtx, node.psCancel = context.WithCancel(context.Background()) + // groupID and whether this topic is used for consensus type t struct { tp nodeconfig.GroupID @@ -725,10 +730,10 @@ func (node *Node) Start() error { nodeP2PMessageCounterVec.With(prometheus.Labels{"type": "ignored"}).Inc() return libp2p_pubsub.ValidationReject } - select { case <-ctx.Done(): - if errors.Is(ctx.Err(), context.DeadlineExceeded) { + if errors.Is(ctx.Err(), context.DeadlineExceeded) || + errors.Is(ctx.Err(), context.Canceled) { utils.Logger().Warn(). Str("topic", topicNamed).Msg("[context] exceeded validation deadline") } @@ -740,6 +745,7 @@ func (node *Node) Start() error { return libp2p_pubsub.ValidationReject }, // WithValidatorTimeout is an option that sets a timeout for an (asynchronous) topic validator. By default there is no timeout in asynchronous validators. + // TODO: Currently this timeout is useless. Verify me. libp2p_pubsub.WithValidatorTimeout(250*time.Millisecond), // WithValidatorConcurrency set the concurernt validator, default is 1024 libp2p_pubsub.WithValidatorConcurrency(p2p.SetAsideForConsensus), @@ -756,40 +762,47 @@ func (node *Node) Start() error { // goroutine to handle consensus messages go func() { - for m := range msgChanConsensus { - // should not take more than 10 seconds to process one message - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - msg := m - go func() { - defer cancel() - - if semConsensus.TryAcquire(1) { - defer semConsensus.Release(1) - - if isThisNodeAnExplorerNode { - if err := node.explorerMessageHandler( - ctx, msg.handleCArg, - ); err != nil { - errChan <- withError{err, nil} - } - } else { - if err := msg.handleC(ctx, msg.handleCArg, msg.senderPubKey); err != nil { - errChan <- withError{err, msg.senderPubKey} + for { + select { + case <-node.psCtx.Done(): + return + case m := <-msgChanConsensus: + // should not take more than 10 seconds to process one message + ctx, cancel := context.WithTimeout(node.psCtx, 10*time.Second) + msg := m + go func() { + defer cancel() + + if semConsensus.TryAcquire(1) { + defer semConsensus.Release(1) + + if isThisNodeAnExplorerNode { + if err := node.explorerMessageHandler( + ctx, msg.handleCArg, + ); err != nil { + errChan <- withError{err, nil} + } + } else { + if err := msg.handleC(ctx, msg.handleCArg, msg.senderPubKey); err != nil { + errChan <- withError{err, msg.senderPubKey} + } } } - } - select { - case <-ctx.Done(): - if errors.Is(ctx.Err(), context.DeadlineExceeded) { - utils.Logger().Warn(). - Str("topic", topicNamed).Msg("[context] exceeded consensus message handler deadline") + select { + // FIXME: wrong use of context. This message have already passed handle actually. + case <-ctx.Done(): + if errors.Is(ctx.Err(), context.DeadlineExceeded) || + errors.Is(ctx.Err(), context.Canceled) { + utils.Logger().Warn(). + Str("topic", topicNamed).Msg("[context] exceeded consensus message handler deadline") + } + errChan <- withError{errors.WithStack(ctx.Err()), nil} + default: + return } - errChan <- withError{errors.WithStack(ctx.Err()), nil} - default: - return - } - }() + }() + } } }() @@ -798,39 +811,46 @@ func (node *Node) Start() error { // goroutine to handle node messages go func() { - for m := range msgChanNode { - // should not take more than 10 seconds to process one message - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - msg := m - go func() { - defer cancel() - if semNode.TryAcquire(1) { - defer semNode.Release(1) - - if err := msg.handleE(ctx, msg.handleEArg, msg.actionType); err != nil { - errChan <- withError{err, nil} + for { + select { + case m := <-msgChanNode: + ctx, cancel := context.WithTimeout(node.psCtx, 10*time.Second) + msg := m + go func() { + defer cancel() + if semNode.TryAcquire(1) { + defer semNode.Release(1) + + if err := msg.handleE(ctx, msg.handleEArg, msg.actionType); err != nil { + errChan <- withError{err, nil} + } } - } - select { - case <-ctx.Done(): - if errors.Is(ctx.Err(), context.DeadlineExceeded) { - utils.Logger().Warn(). - Str("topic", topicNamed).Msg("[context] exceeded node message handler deadline") + select { + case <-ctx.Done(): + if errors.Is(ctx.Err(), context.DeadlineExceeded) || + errors.Is(ctx.Err(), context.Canceled) { + utils.Logger().Warn(). + Str("topic", topicNamed).Msg("[context] exceeded node message handler deadline") + } + errChan <- withError{errors.WithStack(ctx.Err()), nil} + default: + return } - errChan <- withError{errors.WithStack(ctx.Err()), nil} - default: - return - } - }() + }() + case <-node.psCtx.Done(): + return + } } }() go func() { - for { - nextMsg, err := sub.Next(context.Background()) + nextMsg, err := sub.Next(node.psCtx) if err != nil { + if err == context.Canceled { + return + } errChan <- withError{errors.WithStack(err), nil} continue } @@ -855,14 +875,27 @@ func (node *Node) Start() error { }() } - for e := range errChan { - utils.SampledLogger().Info(). - Interface("item", e.payload). - Msgf("[p2p]: issue while handling incoming p2p message: %v", e.err) - } - // NOTE never gets here + go func() { + for { + select { + case <-node.psCtx.Done(): + return + case e := <-errChan: + utils.SampledLogger().Info(). + Interface("item", e.payload). + Msgf("[p2p]: issue while handling incoming p2p message: %v", e.err) + } + } + }() + return nil +} +// StopPubSub stops the pubsub handling +func (node *Node) StopPubSub() { + if node.psCancel != nil { + node.psCancel() + } } // GetSyncID returns the syncID of this node @@ -1004,6 +1037,8 @@ func New( initMetrics() nodeStringCounterVec.WithLabelValues("version", nodeconfig.GetVersion()).Inc() + node.serviceManager = service.NewManager() + return &node } @@ -1136,6 +1171,30 @@ func (node *Node) ServiceManager() *service.Manager { func (node *Node) ShutDown() { node.Blockchain().Stop() node.Beaconchain().Stop() + + if err := node.StopRPC(); err != nil { + utils.Logger().Error().Err(err).Msg("failed to stop RPC") + } + + utils.Logger().Info().Msg("stopping rosetta") + if err := node.StopRosetta(); err != nil { + utils.Logger().Error().Err(err).Msg("failed to stop rosetta") + } + + utils.Logger().Info().Msg("stopping services") + if err := node.StopServices(); err != nil { + utils.Logger().Error().Err(err).Msg("failed to stop services") + } + + // Currently pubSub need to be stopped after consensus. + utils.Logger().Info().Msg("stopping pub-sub") + node.StopPubSub() + + utils.Logger().Info().Msg("stopping host") + if err := node.host.Close(); err != nil { + utils.Logger().Error().Err(err).Msg("failed to stop p2p host") + } + const msg = "Successfully shut down!\n" utils.Logger().Print(msg) fmt.Print(msg) diff --git a/node/node_newblock.go b/node/node_newblock.go index 4d885b31e..30c86c7a5 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -34,7 +34,11 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalTyp utils.Logger().Debug(). Msg("Waiting for Consensus ready") - time.Sleep(30 * time.Second) // Wait for other nodes to be ready (test-only) + select { + case <-time.After(30 * time.Second): + case <-stopChan: + return + } for { // keep waiting for Consensus ready diff --git a/node/service_setup.go b/node/service_setup.go index ed55607db..d4cf400c8 100644 --- a/node/service_setup.go +++ b/node/service_setup.go @@ -3,81 +3,64 @@ package node import ( "fmt" - msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/api/service" "github.com/harmony-one/harmony/api/service/blockproposal" "github.com/harmony-one/harmony/api/service/consensus" "github.com/harmony-one/harmony/api/service/explorer" "github.com/harmony-one/harmony/api/service/networkinfo" - nodeconfig "github.com/harmony-one/harmony/internal/configs/node" - "github.com/harmony-one/harmony/internal/utils" ) -func (node *Node) setupForValidator() { +// RegisterValidatorServices register the validator services. +func (node *Node) RegisterValidatorServices() { _, chanPeer, _ := node.initNodeConfiguration() // Register networkinfo service. "0" is the beacon shard ID - node.serviceManager.RegisterService( + node.serviceManager.Register( service.NetworkInfo, networkinfo.MustNew( node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil, node.networkInfoDHTPath(), ), ) // Register consensus service. - node.serviceManager.RegisterService( + node.serviceManager.Register( service.Consensus, consensus.New(node.BlockChannel, node.Consensus, node.startConsensus), ) // Register new block service. - node.serviceManager.RegisterService( + node.serviceManager.Register( service.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.Consensus.CommitSigChannel, node.WaitForConsensusReadyV2), ) } -func (node *Node) setupForExplorerNode() { +// RegisterExplorerServices register the explorer services +func (node *Node) RegisterExplorerServices() { _, chanPeer, _ := node.initNodeConfiguration() // Register networkinfo service. - node.serviceManager.RegisterService( + node.serviceManager.Register( service.NetworkInfo, networkinfo.MustNew( node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil, node.networkInfoDHTPath()), ) // Register explorer service. - node.serviceManager.RegisterService( + node.serviceManager.Register( service.SupportExplorer, explorer.New(&node.SelfPeer, node.stateSync, node.Blockchain()), ) } -// ServiceManagerSetup setups service store. -func (node *Node) ServiceManagerSetup() { - node.serviceManager = &service.Manager{} - node.serviceMessageChan = make(map[service.Type]chan *msg_pb.Message) - switch node.NodeConfig.Role() { - case nodeconfig.Validator: - node.setupForValidator() - case nodeconfig.ExplorerNode: - node.setupForExplorerNode() - } - node.serviceManager.SetupServiceMessageChan(node.serviceMessageChan) +// RegisterService register a service to the node service manager +func (node *Node) RegisterService(st service.Type, s service.Service) { + node.serviceManager.Register(st, s) } -// RunServices runs registered services. -func (node *Node) RunServices() { - if node.serviceManager == nil { - utils.Logger().Info().Msg("Service manager is not set up yet.") - return - } - node.serviceManager.RunServices() +// StartServices runs registered services. +func (node *Node) StartServices() error { + return node.serviceManager.StartServices() } // StopServices runs registered services. -func (node *Node) StopServices() { - if node.serviceManager == nil { - utils.Logger().Info().Msg("Service manager is not set up yet.") - return - } - node.serviceManager.StopServicesByRole([]service.Type{}) +func (node *Node) StopServices() error { + return node.serviceManager.StopServices() } func (node *Node) networkInfoDHTPath() string { diff --git a/p2p/host.go b/p2p/host.go index 47b0312c5..a99e7e1be 100644 --- a/p2p/host.go +++ b/p2p/host.go @@ -27,6 +27,9 @@ import ( // Host is the client + server in p2p network. type Host interface { + Start() error + Close() error + GetSelfPeer() Peer AddPeer(*Peer) error GetID() libp2p_peer.ID @@ -162,6 +165,16 @@ func (host *HostV2) PubSub() *libp2p_pubsub.PubSub { return host.pubsub } +// Start is the current placeholder +func (host *HostV2) Start() error { + return nil +} + +// Close is the current placeholder +func (host *HostV2) Close() error { + return nil +} + // C .. -> (total known peers, connected, not connected) func (host *HostV2) C() (int, int, int) { connected, not := 0, 0 diff --git a/rosetta/rosetta.go b/rosetta/rosetta.go index e22130969..2b08e2bb0 100644 --- a/rosetta/rosetta.go +++ b/rosetta/rosetta.go @@ -59,6 +59,9 @@ func StartServers(hmy *hmy.Harmony, config nodeconfig.RosettaServerConfig) error // StopServers stops the rosetta http server func StopServers() error { + if listener == nil { + return nil + } if err := listener.Close(); err != nil { return err }