set up message channels

pull/491/head
Minh Doan 6 years ago committed by Minh Doan
parent 0589f07431
commit 3d81d71a03
  1. 7
      api/service/blockproposal/service.go
  2. 23
      api/service/clientsupport/service.go
  3. 7
      api/service/consensus/service.go
  4. 22
      api/service/discovery/service.go
  5. 17
      api/service/explorer/service.go
  6. 12
      api/service/manager.go
  7. 7
      api/service/networkinfo/service.go
  8. 7
      api/service/randomness/service.go
  9. 7
      api/service/rconversion/service.go
  10. 6
      api/service/staking/service.go
  11. 9
      node/node.go

@ -1,6 +1,7 @@
package blockproposal package blockproposal
import ( import (
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
) )
@ -9,6 +10,7 @@ type Service struct {
stopChan chan struct{} stopChan chan struct{}
stoppedChan chan struct{} stoppedChan chan struct{}
readySignal chan struct{} readySignal chan struct{}
messageChan chan *msg_pb.Message
waitForConsensusReady func(readySignal chan struct{}, stopChan chan struct{}, stoppedChan chan struct{}) waitForConsensusReady func(readySignal chan struct{}, stopChan chan struct{}, stoppedChan chan struct{})
} }
@ -47,3 +49,8 @@ func (s *Service) StopService() {
func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) NotifyService(params map[string]interface{}) {
return return
} }
// SetMessageChan sets up message channel to service.
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}

@ -5,6 +5,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
clientService "github.com/harmony-one/harmony/api/client/service" clientService "github.com/harmony-one/harmony/api/client/service"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/state"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -16,10 +17,11 @@ const (
// Service is the client support service. // Service is the client support service.
type Service struct { type Service struct {
server *clientService.Server server *clientService.Server
grpcServer *grpc.Server grpcServer *grpc.Server
ip string ip string
port string port string
messageChan chan *msg_pb.Message
} }
// New returns new client support service. // New returns new client support service.
@ -35,13 +37,18 @@ func New(stateReader func() (*state.DB, error),
} }
// StartService starts client support service. // StartService starts client support service.
func (sc *Service) StartService() { func (s *Service) StartService() {
sc.grpcServer, _ = sc.server.Start(sc.ip, sc.port) s.grpcServer, _ = s.server.Start(s.ip, s.port)
} }
// StopService stops client support service. // StopService stops client support service.
func (sc *Service) StopService() { func (s *Service) StopService() {
sc.grpcServer.Stop() s.grpcServer.Stop()
}
// SetMessageChan sets up message channel to service.
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
} }
// NotifyService notify service // NotifyService notify service

@ -1,6 +1,7 @@
package consensus package consensus
import ( import (
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
@ -13,6 +14,7 @@ type Service struct {
stopChan chan struct{} stopChan chan struct{}
stoppedChan chan struct{} stoppedChan chan struct{}
startChan chan struct{} startChan chan struct{}
messageChan chan *msg_pb.Message
} }
// New returns consensus service. // New returns consensus service.
@ -40,3 +42,8 @@ func (s *Service) StopService() {
func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) NotifyService(params map[string]interface{}) {
return return
} }
// SetMessageChan sets up message channel to service.
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}

@ -4,22 +4,23 @@ import (
"time" "time"
proto_discovery "github.com/harmony-one/harmony/api/proto/discovery" proto_discovery "github.com/harmony-one/harmony/api/proto/discovery"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
proto_node "github.com/harmony-one/harmony/api/proto/node" proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/api/service"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host" "github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/api/service"
) )
// Service is the struct for discovery service. // Service is the struct for discovery service.
type Service struct { type Service struct {
host p2p.Host host p2p.Host
peerChan chan p2p.Peer peerChan chan p2p.Peer
stopChan chan struct{} stopChan chan struct{}
actionChan chan p2p.GroupAction actionChan chan p2p.GroupAction
config service.NodeConfig config service.NodeConfig
actions map[p2p.GroupID]p2p.ActionType actions map[p2p.GroupID]p2p.ActionType
messageChan chan *msg_pb.Message
} }
// New returns discovery service. // New returns discovery service.
@ -142,3 +143,8 @@ func (s *Service) contactP2pPeers() {
func (s *Service) Init() { func (s *Service) Init() {
utils.GetLogInstance().Info("Init discovery service") utils.GetLogInstance().Info("Init discovery service")
} }
// SetMessageChan sets up message channel to service.
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}

@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
"github.com/gorilla/mux" "github.com/gorilla/mux"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
@ -23,11 +24,12 @@ const (
// Service is the struct for explorer service. // Service is the struct for explorer service.
type Service struct { type Service struct {
router *mux.Router router *mux.Router
IP string IP string
Port string Port string
storage *Storage storage *Storage
server *http.Server server *http.Server
messageChan chan *msg_pb.Message
} }
// New returns explorer service. // New returns explorer service.
@ -249,3 +251,8 @@ func (s *Service) GetExplorerAddress(w http.ResponseWriter, r *http.Request) {
func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) NotifyService(params map[string]interface{}) {
return return
} }
// SetMessageChan sets up message channel to service.
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}

@ -4,6 +4,8 @@ import (
"fmt" "fmt"
"time" "time"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/api/service"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
) )
@ -80,6 +82,7 @@ type Action struct {
// Interface is the collection of functions any service needs to implement. // Interface is the collection of functions any service needs to implement.
type Interface interface { type Interface interface {
StartService() StartService()
SetMessageChan(msgChan chan *msg_pb.Message)
StopService() StopService()
NotifyService(map[string]interface{}) NotifyService(map[string]interface{})
} }
@ -174,3 +177,12 @@ func (m *Manager) RunServices() {
m.TakeAction(action) m.TakeAction(action)
} }
} }
// SetupServiceMessageChan sets up message channel to services.
func (m *Manager) SetupServiceMessageChan(mapServiceTypeChan map[service.Type]chan *msg_pb.Message) {
for serviceType, service := range m.services {
// TODO(minhdoan): for performance, consider buffered channel.
mapServiceTypeChan[serviceType] = make(chan *msg_pb.Message)
service.SetMessageChan(mapServiceTypeChan[serviceType])
}
}

@ -8,6 +8,7 @@ import (
"sync" "sync"
"time" "time"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
libp2pdis "github.com/libp2p/go-libp2p-discovery" libp2pdis "github.com/libp2p/go-libp2p-discovery"
@ -28,6 +29,7 @@ type Service struct {
peerChan chan p2p.Peer peerChan chan p2p.Peer
peerInfo <-chan peerstore.PeerInfo peerInfo <-chan peerstore.PeerInfo
discovery *libp2pdis.RoutingDiscovery discovery *libp2pdis.RoutingDiscovery
messageChan chan *msg_pb.Message
} }
// New returns role conversion service. // New returns role conversion service.
@ -157,3 +159,8 @@ func (s *Service) StopService() {
func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) NotifyService(params map[string]interface{}) {
return return
} }
// SetMessageChan sets up message channel to service.
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}

@ -1,6 +1,7 @@
package randomness package randomness
import ( import (
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/drand" "github.com/harmony-one/harmony/drand"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
) )
@ -10,6 +11,7 @@ type Service struct {
stopChan chan struct{} stopChan chan struct{}
stoppedChan chan struct{} stoppedChan chan struct{}
DRand *drand.DRand DRand *drand.DRand
messageChan chan *msg_pb.Message
} }
// New returns randomness generation service. // New returns randomness generation service.
@ -36,3 +38,8 @@ func (s *Service) StopService() {
func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) NotifyService(params map[string]interface{}) {
return return
} }
// SetMessageChan sets up message channel to service.
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}

@ -1,6 +1,7 @@
package rconversion package rconversion
import ( import (
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
) )
@ -8,6 +9,7 @@ import (
type Service struct { type Service struct {
stopChan chan struct{} stopChan chan struct{}
stoppedChan chan struct{} stoppedChan chan struct{}
messageChan chan *msg_pb.Message
} }
// New returns role conversion service. // New returns role conversion service.
@ -61,3 +63,8 @@ func (s *Service) StopService() {
func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) NotifyService(params map[string]interface{}) {
return return
} }
// SetMessageChan sets up message channel to service.
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
}

@ -14,6 +14,7 @@ import (
proto_common "github.com/harmony-one/harmony/api/proto" proto_common "github.com/harmony-one/harmony/api/proto"
"github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
@ -41,6 +42,7 @@ type Service struct {
stakingAmount int64 stakingAmount int64
state State state State
beaconChain *core.BlockChain beaconChain *core.BlockChain
messageChan chan *msg_pb.Message
} }
// New returns staking service. // New returns staking service.
@ -182,4 +184,8 @@ func (s *Service) StopService() {
// NotifyService notify service // NotifyService notify service
func (s *Service) NotifyService(params map[string]interface{}) { func (s *Service) NotifyService(params map[string]interface{}) {
return return
// SetMessageChan sets up message channel to service.
func (s *Service) SetMessageChan(messageChan chan *msg_pb.Message) {
s.messageChan = messageChan
} }

@ -10,8 +10,6 @@ import (
"sync" "sync"
"time" "time"
"github.com/harmony-one/harmony/drand"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
@ -21,6 +19,7 @@ import (
"github.com/harmony-one/harmony/api/client" "github.com/harmony-one/harmony/api/client"
clientService "github.com/harmony-one/harmony/api/client/service" clientService "github.com/harmony-one/harmony/api/client/service"
proto_discovery "github.com/harmony-one/harmony/api/proto/discovery" proto_discovery "github.com/harmony-one/harmony/api/proto/discovery"
msg_pb "github.com/harmony-one/harmony/api/proto/message"
proto_node "github.com/harmony-one/harmony/api/proto/node" proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/api/service" "github.com/harmony-one/harmony/api/service"
service_manager "github.com/harmony-one/harmony/api/service" service_manager "github.com/harmony-one/harmony/api/service"
@ -40,6 +39,7 @@ import (
"github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/drand"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
@ -207,6 +207,9 @@ type Node struct {
// My ShardClient GroupID // My ShardClient GroupID
MyClientGroupID p2p.GroupID MyClientGroupID p2p.GroupID
// map of service type to its message channel.
serviceMessageChan map[service.Type]chan *msg_pb.Message
} }
// Blockchain returns the blockchain from node // Blockchain returns the blockchain from node
@ -719,6 +722,7 @@ func (node *Node) AddBeaconChainDatabase(db ethdb.Database) {
// ServiceManagerSetup setups service store. // ServiceManagerSetup setups service store.
func (node *Node) ServiceManagerSetup() { func (node *Node) ServiceManagerSetup() {
node.serviceManager = &service_manager.Manager{} node.serviceManager = &service_manager.Manager{}
node.serviceMessageChan = make(map[service.Type]chan *msg_pb.Message)
switch node.Role { switch node.Role {
case ShardLeader: case ShardLeader:
node.setupForShardLeader() node.setupForShardLeader()
@ -733,6 +737,7 @@ func (node *Node) ServiceManagerSetup() {
case ClientNode: case ClientNode:
node.setupForClientNode() node.setupForClientNode()
} }
node.serviceManager.SetupServiceMessageChan(node.serviceMessageChan)
} }
// RunServices runs registered services. // RunServices runs registered services.

Loading…
Cancel
Save