Graceful shutdown - All registered services are shutdown gracefully. (#3533)

1. Refactored service manager with cleaner interface.
2. Add prometheus to the service manager.
3. Graceful shutdown of the services (including consensus).
4. Some code refactor regarding consensus graceful shutdown.

Co-authored-by: Rongjian Lan <rongjian.lan@gmail.com>
pull/3542/head
Jacky Wang 4 years ago committed by GitHub
parent 6833b446fe
commit f00c90e3e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      api/service/blockproposal/service.go
  2. 18
      api/service/consensus/service.go
  3. 10
      api/service/explorer/service.go
  4. 187
      api/service/manager.go
  5. 168
      api/service/manager_test.go
  6. 22
      api/service/networkinfo/service.go
  7. 4
      api/service/networkinfo/service_test.go
  8. 71
      api/service/prometheus/service.go
  9. 106
      cmd/harmony/main.go
  10. 17
      consensus/consensus_v2.go
  11. 12
      node/api.go
  12. 189
      node/node.go
  13. 6
      node/node_newblock.go
  14. 51
      node/service_setup.go
  15. 13
      p2p/host.go
  16. 3
      rosetta/rosetta.go

@ -22,38 +22,26 @@ func New(readySignal chan consensus.ProposalType, commitSigsChan chan []byte, wa
return &Service{readySignal: readySignal, commitSigsChan: commitSigsChan, waitForConsensusReady: waitForConsensusReady}
}
// StartService starts block proposal service.
func (s *Service) StartService() {
// Start starts block proposal service.
func (s *Service) Start() error {
s.stopChan = make(chan struct{})
s.stoppedChan = make(chan struct{})
s.Init()
s.Run(s.stopChan, s.stoppedChan)
}
// Init initializes block proposal service.
func (s *Service) Init() {
s.run(s.stopChan, s.stoppedChan)
return nil
}
// Run runs block proposal.
func (s *Service) Run(stopChan chan struct{}, stoppedChan chan struct{}) {
func (s *Service) run(stopChan chan struct{}, stoppedChan chan struct{}) {
s.waitForConsensusReady(s.readySignal, s.commitSigsChan, s.stopChan, s.stoppedChan)
}
// StopService stops block proposal service.
func (s *Service) StopService() {
// Stop stops block proposal service.
func (s *Service) Stop() error {
utils.Logger().Info().Msg("Stopping block proposal service.")
s.stopChan <- struct{}{}
<-s.stoppedChan
utils.Logger().Info().Msg("Role conversion stopped.")
}
// NotifyService notify service
func (s *Service) NotifyService(params map[string]interface{}) {}
// SetMessageChan sets up message channel to service.
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
return nil
}
// APIs for the services.

@ -23,29 +23,23 @@ func New(blockChannel chan *types.Block, consensus *consensus.Consensus, startCh
return &Service{blockChannel: blockChannel, consensus: consensus, startChan: startChan}
}
// StartService starts consensus service.
func (s *Service) StartService() {
// Start starts consensus service.
func (s *Service) Start() error {
utils.Logger().Info().Msg("[consensus/service] Starting consensus service.")
s.stopChan = make(chan struct{})
s.stoppedChan = make(chan struct{})
s.consensus.Start(s.blockChannel, s.stopChan, s.stoppedChan, s.startChan)
s.consensus.WaitForNewRandomness()
return nil
}
// StopService stops consensus service.
func (s *Service) StopService() {
// Stop stops consensus service.
func (s *Service) Stop() error {
utils.Logger().Info().Msg("Stopping consensus service.")
s.stopChan <- struct{}{}
<-s.stoppedChan
utils.Logger().Info().Msg("Consensus service stopped.")
}
// NotifyService notify service
func (s *Service) NotifyService(params map[string]interface{}) {}
// SetMessageChan sets up message channel to service.
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
return s.consensus.Close()
}
// APIs for the services.

@ -54,21 +54,23 @@ func New(selfPeer *p2p.Peer, ss *syncing.StateSync, bc *core.BlockChain) *Servic
return &Service{IP: selfPeer.IP, Port: selfPeer.Port, stateSync: ss, blockchain: bc}
}
// StartService starts explorer service.
func (s *Service) StartService() {
// Start starts explorer service.
func (s *Service) Start() error {
utils.Logger().Info().Msg("Starting explorer service.")
s.Init()
s.server = s.Run()
return nil
}
// StopService shutdowns explorer service.
func (s *Service) StopService() {
// Stop shutdowns explorer service.
func (s *Service) Stop() error {
utils.Logger().Info().Msg("Shutting down explorer service.")
if err := s.server.Shutdown(context.Background()); err != nil {
utils.Logger().Error().Err(err).Msg("Error when shutting down explorer server")
} else {
utils.Logger().Info().Msg("Shutting down explorer server successfully")
}
return nil
}
// GetExplorerPort returns the port serving explorer dashboard. This port is explorerPortDifference less than the node port.

@ -1,19 +1,12 @@
package service
import (
"fmt"
"github.com/ethereum/go-ethereum/rpc"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/internal/utils"
)
// ActionType is the input for Service Manager to operate.
type ActionType byte
// Constants for Action Type.
const (
Start ActionType = iota
Stop
Notify
"github.com/pkg/errors"
"github.com/rs/zerolog"
)
// Type is service type.
@ -21,11 +14,13 @@ type Type byte
// Constants for Type.
const (
ClientSupport Type = iota
UnknownService Type = iota
ClientSupport
SupportExplorer
Consensus
BlockProposal
NetworkInfo
Prometheus
)
func (t Type) String() string {
@ -40,139 +35,111 @@ func (t Type) String() string {
return "BlockProposal"
case NetworkInfo:
return "NetworkInfo"
case Prometheus:
return "Prometheus"
default:
return "Unknown"
}
}
// Action is type of service action.
type Action struct {
Action ActionType
ServiceType Type
Params map[string]interface{}
}
// Interface is the collection of functions any service needs to implement.
type Interface interface {
StartService()
SetMessageChan(msgChan chan *msg_pb.Message)
StopService()
NotifyService(map[string]interface{})
// APIs retrieves the list of RPC descriptors the service provides
APIs() []rpc.API
// Service is the collection of functions any service needs to implement.
type Service interface {
Start() error
Stop() error
APIs() []rpc.API // the list of RPC descriptors the service provides
}
// Manager stores all services for service manager.
type Manager struct {
services map[Type]Interface
actionChannel chan *Action
services []Service
serviceMap map[Type]Service
logger zerolog.Logger
}
// GetServices returns all registered services.
func (m *Manager) GetServices() map[Type]Interface {
return m.services
// NewManager creates a new manager
func NewManager() *Manager {
return &Manager{
services: nil,
serviceMap: make(map[Type]Service),
logger: *utils.Logger(),
}
}
// Register registers new service to service store.
func (m *Manager) Register(t Type, service Interface) {
func (m *Manager) Register(t Type, service Service) {
utils.Logger().Info().Int("service", int(t)).Msg("Register Service")
if m.services == nil {
m.services = make(map[Type]Interface)
}
if _, ok := m.services[t]; ok {
utils.Logger().Error().Int("servie", int(t)).Msg("This service is already included")
if _, ok := m.serviceMap[t]; ok {
utils.Logger().Error().Int("service", int(t)).Msg("This service is already included")
return
}
m.services[t] = service
}
// SetupServiceManager inits service map and start service manager.
func (m *Manager) SetupServiceManager() {
m.InitServiceMap()
m.actionChannel = m.StartServiceManager()
m.services = append(m.services, service)
m.serviceMap[t] = service
}
// RegisterService is used for testing.
func (m *Manager) RegisterService(t Type, service Interface) {
m.Register(t, service)
// GetServices returns all registered services.
func (m *Manager) GetServices() []Service {
return m.services
}
// InitServiceMap initializes service map.
func (m *Manager) InitServiceMap() {
m.services = make(map[Type]Interface)
}
// StartServices run all registered services. If one of the starting service returns
// an error, closing all started services.
func (m *Manager) StartServices() (err error) {
started := make([]Service, 0, len(m.services))
// TakeAction is how service manager handles the action.
func (m *Manager) TakeAction(action *Action) {
if m.services == nil {
utils.Logger().Error().Msg("Service store is not initialized")
return
}
if service, ok := m.services[action.ServiceType]; ok {
switch action.Action {
case Start:
service.StartService()
case Stop:
service.StopService()
case Notify:
service.NotifyService(action.Params)
}
}
}
// StartServiceManager starts service manager.
func (m *Manager) StartServiceManager() chan *Action {
ch := make(chan *Action)
go func() {
for {
select {
case action := <-ch:
m.TakeAction(action)
defer func() {
if err != nil {
// If error is not nil, closing all services in reverse order
if stopErr := m.stopServices(started); stopErr != nil {
err = fmt.Errorf("%v; %v", err, stopErr)
}
}
}()
return ch
}
// RunServices run registered services.
func (m *Manager) RunServices() {
for serviceType := range m.services {
action := &Action{
Action: Start,
ServiceType: serviceType,
for _, service := range m.services {
t := m.typeByService(service)
m.logger.Info().Str("type", t.String()).Msg("Starting service")
if err = service.Start(); err != nil {
err = errors.Wrapf(err, "cannot start service [%v]", t.String())
return err
}
m.TakeAction(action)
started = append(started, service)
}
return err
}
// SetupServiceMessageChan sets up message channel to services.
func (m *Manager) SetupServiceMessageChan(
mapServiceTypeChan map[Type]chan *msg_pb.Message,
) {
for serviceType, service := range m.services {
mapServiceTypeChan[serviceType] = make(chan *msg_pb.Message)
service.SetMessageChan(mapServiceTypeChan[serviceType])
}
// StopServices stops all services in the reverse order.
func (m *Manager) StopServices() error {
return m.stopServices(m.services)
}
// StopService stops service with type t.
func (m *Manager) StopService(t Type) {
if service, ok := m.services[t]; ok {
service.StopService()
// stopServices stops given services in the reverse order.
func (m *Manager) stopServices(services []Service) error {
size := len(services)
var rErr error
for i := size - 1; i >= 0; i-- {
service := services[i]
t := m.typeByService(service)
m.logger.Info().Str("type", t.String()).Msg("Stopping service")
if err := service.Stop(); err != nil {
err = errors.Wrapf(err, "failed to stop service [%v]", t.String())
if rErr != nil {
rErr = fmt.Errorf("%v; %v", rErr, err)
} else {
rErr = err
}
}
}
return rErr
}
// StopServicesByRole stops all service of the given role.
func (m *Manager) StopServicesByRole(liveServices []Type) {
marked := make(map[Type]bool)
for _, s := range liveServices {
marked[s] = true
}
for t := range m.GetServices() {
if _, ok := marked[t]; !ok {
m.StopService(t)
func (m *Manager) typeByService(target Service) Type {
for t, s := range m.serviceMap {
if s == target {
return t
}
}
return UnknownService
}

@ -1,30 +1,162 @@
package service
import (
"errors"
"fmt"
"strings"
"testing"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/ethereum/go-ethereum/rpc"
)
func TestMessageChan(t *testing.T) {
m := &Manager{}
m.SetupServiceManager()
msgChans := make(map[Type]chan *msg_pb.Message)
m.SetupServiceMessageChan(msgChans)
func TestManager_StartServices(t *testing.T) {
tests := []struct {
services []Service
stopped bool
err error
}{
{
services: []Service{
makeTestService(0, nil, nil),
makeTestService(1, nil, nil),
makeTestService(2, nil, nil),
},
stopped: false,
err: nil,
},
{
services: []Service{
makeTestService(0, func() error { return errors.New("start error") }, nil),
},
stopped: true,
err: errors.New("cannot start service [Unknown]: start error"),
},
{
services: []Service{
makeTestService(0, nil, nil),
makeTestService(1, nil, nil),
makeTestService(2, func() error { return errors.New("start error") }, nil),
},
stopped: true,
err: errors.New("cannot start service [Unknown]: start error"),
},
{
services: []Service{
makeTestService(0, nil, nil),
makeTestService(1, nil, func() error { return errors.New("stop error") }),
makeTestService(2, func() error { return errors.New("start error") }, nil),
},
stopped: true,
err: errors.New("cannot start service [Unknown]: start error; failed to stop service [Unknown]: stop error"),
},
}
for i, test := range tests {
m := &Manager{
services: test.services,
}
err := m.StartServices()
if assErr := assertError(err, test.err); assErr != nil {
t.Errorf("Test %v: unexpected error: %v", i, assErr)
}
for _, s := range test.services {
ts := s.(*testService)
if ts.started == test.stopped {
t.Errorf("Test %v: [service %v] test status unexpected", i, ts.index)
}
}
}
}
func TestManager_StopServices(t *testing.T) {
tests := []struct {
services []Service
expErr error
}{
{
services: []Service{
makeTestService(0, nil, nil),
makeTestService(1, nil, nil),
makeTestService(2, nil, nil),
},
expErr: nil,
},
{
services: []Service{
makeTestService(0, nil, nil),
makeTestService(1, nil, func() error { return errors.New("expect error") }),
makeTestService(2, nil, func() error { return errors.New("expect error") }),
},
expErr: errors.New("failed to stop service [Unknown]: expect error; failed to stop service [Unknown]: expect error"),
},
}
for i, test := range tests {
m := &Manager{
services: test.services,
}
err := m.StopServices()
if assErr := assertError(err, test.expErr); assErr != nil {
t.Errorf("Test %v: %v", i, assErr)
}
for _, s := range test.services {
ts := s.(*testService)
if ts.started {
t.Errorf("Test %v: Service%v not stopped", i, ts.index)
}
}
}
}
type testService struct {
index int
started bool
startErrHook func() error
stopErrHook func() error
}
func makeTestService(index int, startErrHook, stopErrHook func() error) *testService {
return &testService{
index: index,
startErrHook: startErrHook,
stopErrHook: stopErrHook,
}
}
func (s *testService) Start() error {
if s.startErrHook != nil {
if err := s.startErrHook(); err != nil {
return err
}
}
s.started = true
return nil
}
func TestInit(t *testing.T) {
if GroupIDShards[nodeconfig.ShardID(0)] != nodeconfig.NewGroupIDByShardID(0) {
t.Errorf("GroupIDShards[0]: %v != GroupIDBeacon: %v",
GroupIDShards[nodeconfig.ShardID(0)],
nodeconfig.NewGroupIDByShardID(0),
)
func (s *testService) Stop() error {
if s.stopErrHook != nil {
if err := s.stopErrHook(); err != nil {
s.started = false
return err
}
}
s.started = false
return nil
}
func (s *testService) APIs() []rpc.API {
return nil
}
func assertError(got, expect error) error {
if (got == nil) != (expect == nil) {
return fmt.Errorf("unexpected error [%v] / [%v]", got, expect)
}
if (got == nil) || (expect == nil) {
return nil
}
if len(GroupIDShards) != nodeconfig.MaxShards {
t.Errorf("len(GroupIDShards): %v != TotalShards: %v",
len(GroupIDShards),
nodeconfig.MaxShards,
)
if !strings.Contains(got.Error(), expect.Error()) {
return fmt.Errorf("unexpected error [%v] / [%v]", got, expect)
}
return nil
}

@ -109,15 +109,16 @@ func MustNew(
return service
}
// StartService starts network info service.
func (s *Service) StartService() {
// Start starts network info service.
func (s *Service) Start() error {
err := s.Init()
if err != nil {
utils.Logger().Error().Err(err).Msg("Service Init Failed")
return
return nil
}
s.Run()
s.started = true
return nil
}
// Init initializes role conversion service.
@ -277,27 +278,20 @@ func (s *Service) findPeers(ctx context.Context) {
utils.Logger().Info().Msg("PeerInfo Channel Closed")
}
// StopService stops network info service.
func (s *Service) StopService() {
// Stop stops network info service.
func (s *Service) Stop() error {
utils.Logger().Info().Msg("Stopping network info service")
defer s.cancel()
if !s.started {
utils.Logger().Info().Msg("Service didn't started. Exit")
return
return nil
}
s.stopChan <- struct{}{}
<-s.stoppedChan
utils.Logger().Info().Msg("Network info service stopped")
}
// NotifyService notify service
func (s *Service) NotifyService(params map[string]interface{}) {}
// SetMessageChan sets up message channel to service.
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
return nil
}
// APIs for the services.

@ -31,7 +31,7 @@ func TestService(t *testing.T) {
t.Fatalf("New() failed: %s", err)
}
s.StartService()
s.Start()
time.Sleep(2 * time.Second)
s.StopService()
s.Stop()
}

@ -11,6 +11,7 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/rpc"
"github.com/harmony-one/harmony/internal/utils"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
@ -43,6 +44,8 @@ type Service struct {
pusher *push.Pusher
failStatus error
config Config
registryOnce sync.Once
}
// Handler represents a path and handler func to serve on the same port as /metrics, /healthz, /goroutinez, etc.
@ -52,8 +55,8 @@ type Handler struct {
}
var (
registryOnce sync.Once
svc = &Service{}
initOnce sync.Once
svc = &Service{}
)
func (s *Service) getJobName() string {
@ -76,11 +79,18 @@ func (s *Service) getJobName() string {
}
// NewService sets up a new instance for a given address host:port.
// An empty host will match with any IP so an address like ":19000" is perfectly acceptable.
func NewService(cfg Config, additionalHandlers ...Handler) {
// Anq empty host will match with any IP so an address like ":19000" is perfectly acceptable.
func NewService(cfg Config, additionalHandlers ...Handler) *Service {
initOnce.Do(func() {
svc = newService(cfg, additionalHandlers...)
})
return svc
}
func newService(cfg Config, additionalHandlers ...Handler) *Service {
if !cfg.Enabled {
utils.Logger().Info().Msg("Prometheus http server disabled...")
return
return nil
}
utils.Logger().Debug().Str("cfg", cfg.String()).Msg("Prometheus")
@ -107,26 +117,11 @@ func NewService(cfg Config, additionalHandlers ...Handler) {
Msg("Starting Prometheus server")
endpoint := fmt.Sprintf("%s:%d", svc.config.IP, svc.config.Port)
svc.server = &http.Server{Addr: endpoint, Handler: mux}
svc.Start()
}
// StopService stop the Prometheus service
func StopService() error {
return svc.Stop()
return svc
}
func (s *Service) goroutinezHandler(w http.ResponseWriter, _ *http.Request) {
stack := debug.Stack()
if _, err := w.Write(stack); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to write goroutines stack")
}
if err := pprof.Lookup("goroutine").WriteTo(w, 2); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to write pprof goroutines")
}
}
// Start the prometheus service.
func (s *Service) Start() {
// Start start the prometheus service
func (s *Service) Start() error {
go func() {
utils.Logger().Info().Str("address", s.server.Addr).Msg("Starting prometheus service")
err := s.server.ListenAndServe()
@ -158,15 +153,26 @@ func (s *Service) Start() {
}
}(s)
}
return nil
}
// Stop the service gracefully.
// Stop stop the Prometheus service
func (s *Service) Stop() error {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
return s.server.Shutdown(ctx)
}
func (s *Service) goroutinezHandler(w http.ResponseWriter, _ *http.Request) {
stack := debug.Stack()
if _, err := w.Write(stack); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to write goroutines stack")
}
if err := pprof.Lookup("goroutine").WriteTo(w, 2); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to write pprof goroutines")
}
}
// Status checks for any service failure conditions.
func (s *Service) Status() error {
if s.failStatus != nil {
@ -175,12 +181,21 @@ func (s *Service) Status() error {
return nil
}
// PromRegistry return the registry of prometheus service
func PromRegistry() *prometheus.Registry {
registryOnce.Do(func() {
// APIs returns the RPC apis of the prometheus service
func (s *Service) APIs() []rpc.API {
return nil
}
func (s *Service) getRegistry() *prometheus.Registry {
s.registryOnce.Do(func() {
if svc.registry == nil {
svc.registry = prometheus.NewRegistry()
}
})
return svc.registry
return s.registry
}
// PromRegistry return the registry of prometheus service
func PromRegistry() *prometheus.Registry {
return svc.getRegistry()
}

@ -300,34 +300,6 @@ func setupNodeAndRun(hc harmonyConfig) {
utils.Logger().Warn().Err(err).Msg("Check Local Time Accuracy Error")
}
// Prepare for graceful shutdown from os signals
osSignal := make(chan os.Signal)
signal.Notify(osSignal, os.Interrupt, syscall.SIGTERM)
go func(node *node.Node) {
for sig := range osSignal {
if sig == syscall.SIGTERM || sig == os.Interrupt {
utils.Logger().Warn().Str("signal", sig.String()).Msg("Gracefully shutting down...")
const msg = "Got %s signal. Gracefully shutting down...\n"
fmt.Fprintf(os.Stderr, msg, sig)
// stop block proposal service for leader
if node.Consensus.IsLeader() {
node.ServiceManager().StopService(service.BlockProposal)
}
if node.Consensus.Mode() == consensus.Normal {
phase := node.Consensus.GetConsensusPhase()
utils.Logger().Warn().Str("phase", phase).Msg("[shutdown] commit phase has to wait")
maxWait := time.Now().Add(2 * node.Consensus.BlockPeriod) // wait up to 2 * blockperiod in commit phase
for time.Now().Before(maxWait) &&
node.Consensus.GetConsensusPhase() == "Commit" {
utils.Logger().Warn().Msg("[shutdown] wait for consensus finished")
time.Sleep(time.Millisecond * 100)
}
}
currentNode.ShutDown()
}
}
}(currentNode)
// Parse RPC config
nodeConfig.RPCServer = nodeconfig.RPCServerConfig{
HTTPEnabled: hc.HTTP.Enabled,
@ -394,22 +366,22 @@ func setupNodeAndRun(hc harmonyConfig) {
nodeconfig.SetPeerID(myHost.GetID())
prometheusConfig := prometheus.Config{
Enabled: hc.Prometheus.Enabled,
IP: hc.Prometheus.IP,
Port: hc.Prometheus.Port,
EnablePush: hc.Prometheus.EnablePush,
Gateway: hc.Prometheus.Gateway,
Network: hc.Network.NetworkType,
Legacy: hc.General.NoStaking,
NodeType: hc.General.NodeType,
Shard: nodeConfig.ShardID,
Instance: myHost.GetID().Pretty(),
if currentNode.NodeConfig.Role() == nodeconfig.Validator {
currentNode.RegisterValidatorServices()
} else if currentNode.NodeConfig.Role() == nodeconfig.ExplorerNode {
currentNode.RegisterExplorerServices()
}
if hc.Prometheus.Enabled {
setupPrometheusService(currentNode, hc, nodeConfig.ShardID)
}
// TODO: replace this legacy syncing
currentNode.SupportSyncing()
currentNode.ServiceManagerSetup()
currentNode.RunServices()
if err := currentNode.StartServices(); err != nil {
fmt.Fprint(os.Stderr, err.Error())
os.Exit(-1)
}
if err := currentNode.StartRPC(); err != nil {
utils.Logger().Warn().
@ -423,23 +395,21 @@ func setupNodeAndRun(hc harmonyConfig) {
Msg("Start Rosetta failed")
}
if err := currentNode.StartPrometheus(prometheusConfig); err != nil {
utils.Logger().Warn().
Err(err).
Msg("Start Prometheus failed")
}
go listenOSSigAndShutDown(currentNode)
if err := currentNode.BootstrapConsensus(); err != nil {
fmt.Println("could not bootstrap consensus", err.Error())
fmt.Fprint(os.Stderr, "could not bootstrap consensus", err.Error())
if !currentNode.NodeConfig.IsOffline {
os.Exit(-1)
}
}
if err := currentNode.Start(); err != nil {
fmt.Println("could not begin network message handling for node", err.Error())
if err := currentNode.StartPubSub(); err != nil {
fmt.Fprint(os.Stderr, "could not begin network message handling for node", err.Error())
os.Exit(-1)
}
select {}
}
func nodeconfigSetShardSchedule(config harmonyConfig) {
@ -707,6 +677,23 @@ func setupConsensusAndNode(hc harmonyConfig, nodeConfig *nodeconfig.ConfigType)
return currentNode
}
func setupPrometheusService(node *node.Node, hc harmonyConfig, sid uint32) {
prometheusConfig := prometheus.Config{
Enabled: hc.Prometheus.Enabled,
IP: hc.Prometheus.IP,
Port: hc.Prometheus.Port,
EnablePush: hc.Prometheus.EnablePush,
Gateway: hc.Prometheus.Gateway,
Network: hc.Network.NetworkType,
Legacy: hc.General.NoStaking,
NodeType: hc.General.NodeType,
Shard: sid,
Instance: myHost.GetID().Pretty(),
}
p := prometheus.NewService(prometheusConfig)
node.RegisterService(service.Prometheus, p)
}
func setupBlacklist(hc harmonyConfig) (map[ethCommon.Address]struct{}, error) {
utils.Logger().Debug().Msgf("Using blacklist file at `%s`", hc.TxPool.BlacklistFile)
dat, err := ioutil.ReadFile(hc.TxPool.BlacklistFile)
@ -726,3 +713,24 @@ func setupBlacklist(hc harmonyConfig) (map[ethCommon.Address]struct{}, error) {
}
return addrMap, nil
}
func listenOSSigAndShutDown(node *node.Node) {
// Prepare for graceful shutdown from os signals
osSignal := make(chan os.Signal)
signal.Notify(osSignal, syscall.SIGINT, syscall.SIGTERM)
sig := <-osSignal
utils.Logger().Warn().Str("signal", sig.String()).Msg("Gracefully shutting down...")
const msg = "Got %s signal. Gracefully shutting down...\n"
fmt.Fprintf(os.Stderr, msg, sig)
go node.ShutDown()
for i := 10; i > 0; i-- {
<-osSignal
if i > 1 {
fmt.Printf("Already shutting down, interrupt more to force quit: (times=%v)\n", i-1)
}
}
fmt.Println("Forced QUIT.")
os.Exit(-1)
}

@ -469,6 +469,23 @@ func (consensus *Consensus) Start(
}()
}
// Close close the consensus. If current is in normal commit phase, wait until the commit
// phase end.
func (consensus *Consensus) Close() error {
if consensus.Mode() != Normal || consensus.phase != FBFTCommit {
return nil
}
// We only need to wait consensus is in normal commit phase
utils.Logger().Warn().Str("phase", consensus.phase.String()).Msg("[shutdown] commit phase has to wait")
maxWait := time.Now().Add(2 * consensus.BlockPeriod)
for time.Now().Before(maxWait) && consensus.GetConsensusPhase() == "Commit" {
utils.Logger().Warn().Msg("[shutdown] wait for consensus finished")
time.Sleep(time.Millisecond * 100)
}
return nil
}
// LastMileBlockIter is the iterator to iterate over the last mile blocks in consensus cache.
// All blocks returned are guaranteed to pass the verification.
type LastMileBlockIter struct {

@ -2,7 +2,6 @@ package node
import (
"github.com/ethereum/go-ethereum/rpc"
"github.com/harmony-one/harmony/api/service/prometheus"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/hmy"
"github.com/harmony-one/harmony/rosetta"
@ -82,17 +81,6 @@ func (node *Node) StopRPC() error {
return hmy_rpc.StopServers()
}
// StartPrometheus start promtheus metrics service
func (node *Node) StartPrometheus(cfg prometheus.Config) error {
prometheus.NewService(cfg)
return nil
}
// StopPrometheus stop prometheus metrics service
func (node *Node) StopPrometheus() error {
return prometheus.StopService()
}
// StartRosetta start rosetta service
func (node *Node) StartRosetta() error {
harmony := hmy.New(node, node.TxPool, node.CxPool, node.Consensus.ShardID)

@ -106,7 +106,6 @@ type Node struct {
// Chain configuration.
chainConfig params.ChainConfig
// map of service type to its message channel.
serviceMessageChan map[service.Type]chan *msg_pb.Message
isFirstTime bool // the node was started with a fresh database
unixTimeAtNodeStart int64
// KeysToAddrs holds the addresses of bls keys run by the node
@ -125,6 +124,10 @@ type Node struct {
committeeCache *lru.Cache
Metrics metrics.Registry
// context control for pub-sub handling
psCtx context.Context
psCancel func()
}
// Blockchain returns the blockchain for the node's current shard.
@ -548,8 +551,10 @@ var (
errConsensusMessageOnUnexpectedTopic = errors.New("received consensus on wrong topic")
)
// Start kicks off the node message handling
func (node *Node) Start() error {
// StartPubSub kicks off the node message handling
func (node *Node) StartPubSub() error {
node.psCtx, node.psCancel = context.WithCancel(context.Background())
// groupID and whether this topic is used for consensus
type t struct {
tp nodeconfig.GroupID
@ -725,10 +730,10 @@ func (node *Node) Start() error {
nodeP2PMessageCounterVec.With(prometheus.Labels{"type": "ignored"}).Inc()
return libp2p_pubsub.ValidationReject
}
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
if errors.Is(ctx.Err(), context.DeadlineExceeded) ||
errors.Is(ctx.Err(), context.Canceled) {
utils.Logger().Warn().
Str("topic", topicNamed).Msg("[context] exceeded validation deadline")
}
@ -740,6 +745,7 @@ func (node *Node) Start() error {
return libp2p_pubsub.ValidationReject
},
// WithValidatorTimeout is an option that sets a timeout for an (asynchronous) topic validator. By default there is no timeout in asynchronous validators.
// TODO: Currently this timeout is useless. Verify me.
libp2p_pubsub.WithValidatorTimeout(250*time.Millisecond),
// WithValidatorConcurrency set the concurernt validator, default is 1024
libp2p_pubsub.WithValidatorConcurrency(p2p.SetAsideForConsensus),
@ -756,40 +762,47 @@ func (node *Node) Start() error {
// goroutine to handle consensus messages
go func() {
for m := range msgChanConsensus {
// should not take more than 10 seconds to process one message
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
msg := m
go func() {
defer cancel()
if semConsensus.TryAcquire(1) {
defer semConsensus.Release(1)
if isThisNodeAnExplorerNode {
if err := node.explorerMessageHandler(
ctx, msg.handleCArg,
); err != nil {
errChan <- withError{err, nil}
}
} else {
if err := msg.handleC(ctx, msg.handleCArg, msg.senderPubKey); err != nil {
errChan <- withError{err, msg.senderPubKey}
for {
select {
case <-node.psCtx.Done():
return
case m := <-msgChanConsensus:
// should not take more than 10 seconds to process one message
ctx, cancel := context.WithTimeout(node.psCtx, 10*time.Second)
msg := m
go func() {
defer cancel()
if semConsensus.TryAcquire(1) {
defer semConsensus.Release(1)
if isThisNodeAnExplorerNode {
if err := node.explorerMessageHandler(
ctx, msg.handleCArg,
); err != nil {
errChan <- withError{err, nil}
}
} else {
if err := msg.handleC(ctx, msg.handleCArg, msg.senderPubKey); err != nil {
errChan <- withError{err, msg.senderPubKey}
}
}
}
}
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
utils.Logger().Warn().
Str("topic", topicNamed).Msg("[context] exceeded consensus message handler deadline")
select {
// FIXME: wrong use of context. This message have already passed handle actually.
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) ||
errors.Is(ctx.Err(), context.Canceled) {
utils.Logger().Warn().
Str("topic", topicNamed).Msg("[context] exceeded consensus message handler deadline")
}
errChan <- withError{errors.WithStack(ctx.Err()), nil}
default:
return
}
errChan <- withError{errors.WithStack(ctx.Err()), nil}
default:
return
}
}()
}()
}
}
}()
@ -798,39 +811,46 @@ func (node *Node) Start() error {
// goroutine to handle node messages
go func() {
for m := range msgChanNode {
// should not take more than 10 seconds to process one message
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
msg := m
go func() {
defer cancel()
if semNode.TryAcquire(1) {
defer semNode.Release(1)
if err := msg.handleE(ctx, msg.handleEArg, msg.actionType); err != nil {
errChan <- withError{err, nil}
for {
select {
case m := <-msgChanNode:
ctx, cancel := context.WithTimeout(node.psCtx, 10*time.Second)
msg := m
go func() {
defer cancel()
if semNode.TryAcquire(1) {
defer semNode.Release(1)
if err := msg.handleE(ctx, msg.handleEArg, msg.actionType); err != nil {
errChan <- withError{err, nil}
}
}
}
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
utils.Logger().Warn().
Str("topic", topicNamed).Msg("[context] exceeded node message handler deadline")
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.DeadlineExceeded) ||
errors.Is(ctx.Err(), context.Canceled) {
utils.Logger().Warn().
Str("topic", topicNamed).Msg("[context] exceeded node message handler deadline")
}
errChan <- withError{errors.WithStack(ctx.Err()), nil}
default:
return
}
errChan <- withError{errors.WithStack(ctx.Err()), nil}
default:
return
}
}()
}()
case <-node.psCtx.Done():
return
}
}
}()
go func() {
for {
nextMsg, err := sub.Next(context.Background())
nextMsg, err := sub.Next(node.psCtx)
if err != nil {
if err == context.Canceled {
return
}
errChan <- withError{errors.WithStack(err), nil}
continue
}
@ -855,14 +875,27 @@ func (node *Node) Start() error {
}()
}
for e := range errChan {
utils.SampledLogger().Info().
Interface("item", e.payload).
Msgf("[p2p]: issue while handling incoming p2p message: %v", e.err)
}
// NOTE never gets here
go func() {
for {
select {
case <-node.psCtx.Done():
return
case e := <-errChan:
utils.SampledLogger().Info().
Interface("item", e.payload).
Msgf("[p2p]: issue while handling incoming p2p message: %v", e.err)
}
}
}()
return nil
}
// StopPubSub stops the pubsub handling
func (node *Node) StopPubSub() {
if node.psCancel != nil {
node.psCancel()
}
}
// GetSyncID returns the syncID of this node
@ -1004,6 +1037,8 @@ func New(
initMetrics()
nodeStringCounterVec.WithLabelValues("version", nodeconfig.GetVersion()).Inc()
node.serviceManager = service.NewManager()
return &node
}
@ -1136,6 +1171,30 @@ func (node *Node) ServiceManager() *service.Manager {
func (node *Node) ShutDown() {
node.Blockchain().Stop()
node.Beaconchain().Stop()
if err := node.StopRPC(); err != nil {
utils.Logger().Error().Err(err).Msg("failed to stop RPC")
}
utils.Logger().Info().Msg("stopping rosetta")
if err := node.StopRosetta(); err != nil {
utils.Logger().Error().Err(err).Msg("failed to stop rosetta")
}
utils.Logger().Info().Msg("stopping services")
if err := node.StopServices(); err != nil {
utils.Logger().Error().Err(err).Msg("failed to stop services")
}
// Currently pubSub need to be stopped after consensus.
utils.Logger().Info().Msg("stopping pub-sub")
node.StopPubSub()
utils.Logger().Info().Msg("stopping host")
if err := node.host.Close(); err != nil {
utils.Logger().Error().Err(err).Msg("failed to stop p2p host")
}
const msg = "Successfully shut down!\n"
utils.Logger().Print(msg)
fmt.Print(msg)

@ -34,7 +34,11 @@ func (node *Node) WaitForConsensusReadyV2(readySignal chan consensus.ProposalTyp
utils.Logger().Debug().
Msg("Waiting for Consensus ready")
time.Sleep(30 * time.Second) // Wait for other nodes to be ready (test-only)
select {
case <-time.After(30 * time.Second):
case <-stopChan:
return
}
for {
// keep waiting for Consensus ready

@ -3,81 +3,64 @@ package node
import (
"fmt"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/api/service"
"github.com/harmony-one/harmony/api/service/blockproposal"
"github.com/harmony-one/harmony/api/service/consensus"
"github.com/harmony-one/harmony/api/service/explorer"
"github.com/harmony-one/harmony/api/service/networkinfo"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
)
func (node *Node) setupForValidator() {
// RegisterValidatorServices register the validator services.
func (node *Node) RegisterValidatorServices() {
_, chanPeer, _ := node.initNodeConfiguration()
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(
node.serviceManager.Register(
service.NetworkInfo,
networkinfo.MustNew(
node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil, node.networkInfoDHTPath(),
),
)
// Register consensus service.
node.serviceManager.RegisterService(
node.serviceManager.Register(
service.Consensus,
consensus.New(node.BlockChannel, node.Consensus, node.startConsensus),
)
// Register new block service.
node.serviceManager.RegisterService(
node.serviceManager.Register(
service.BlockProposal,
blockproposal.New(node.Consensus.ReadySignal, node.Consensus.CommitSigChannel, node.WaitForConsensusReadyV2),
)
}
func (node *Node) setupForExplorerNode() {
// RegisterExplorerServices register the explorer services
func (node *Node) RegisterExplorerServices() {
_, chanPeer, _ := node.initNodeConfiguration()
// Register networkinfo service.
node.serviceManager.RegisterService(
node.serviceManager.Register(
service.NetworkInfo,
networkinfo.MustNew(
node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil, node.networkInfoDHTPath()),
)
// Register explorer service.
node.serviceManager.RegisterService(
node.serviceManager.Register(
service.SupportExplorer, explorer.New(&node.SelfPeer, node.stateSync, node.Blockchain()),
)
}
// ServiceManagerSetup setups service store.
func (node *Node) ServiceManagerSetup() {
node.serviceManager = &service.Manager{}
node.serviceMessageChan = make(map[service.Type]chan *msg_pb.Message)
switch node.NodeConfig.Role() {
case nodeconfig.Validator:
node.setupForValidator()
case nodeconfig.ExplorerNode:
node.setupForExplorerNode()
}
node.serviceManager.SetupServiceMessageChan(node.serviceMessageChan)
// RegisterService register a service to the node service manager
func (node *Node) RegisterService(st service.Type, s service.Service) {
node.serviceManager.Register(st, s)
}
// RunServices runs registered services.
func (node *Node) RunServices() {
if node.serviceManager == nil {
utils.Logger().Info().Msg("Service manager is not set up yet.")
return
}
node.serviceManager.RunServices()
// StartServices runs registered services.
func (node *Node) StartServices() error {
return node.serviceManager.StartServices()
}
// StopServices runs registered services.
func (node *Node) StopServices() {
if node.serviceManager == nil {
utils.Logger().Info().Msg("Service manager is not set up yet.")
return
}
node.serviceManager.StopServicesByRole([]service.Type{})
func (node *Node) StopServices() error {
return node.serviceManager.StopServices()
}
func (node *Node) networkInfoDHTPath() string {

@ -27,6 +27,9 @@ import (
// Host is the client + server in p2p network.
type Host interface {
Start() error
Close() error
GetSelfPeer() Peer
AddPeer(*Peer) error
GetID() libp2p_peer.ID
@ -162,6 +165,16 @@ func (host *HostV2) PubSub() *libp2p_pubsub.PubSub {
return host.pubsub
}
// Start is the current placeholder
func (host *HostV2) Start() error {
return nil
}
// Close is the current placeholder
func (host *HostV2) Close() error {
return nil
}
// C .. -> (total known peers, connected, not connected)
func (host *HostV2) C() (int, int, int) {
connected, not := 0, 0

@ -59,6 +59,9 @@ func StartServers(hmy *hmy.Harmony, config nodeconfig.RosettaServerConfig) error
// StopServers stops the rosetta http server
func StopServers() error {
if listener == nil {
return nil
}
if err := listener.Close(); err != nil {
return err
}

Loading…
Cancel
Save