diff --git a/api/proto/discovery/pingpong.go b/api/proto/discovery/pingpong.go index e91d94c8d..4fcd5076c 100644 --- a/api/proto/discovery/pingpong.go +++ b/api/proto/discovery/pingpong.go @@ -19,7 +19,6 @@ import ( "github.com/harmony-one/bls/ffi/go/bls" "github.com/harmony-one/harmony/api/proto" "github.com/harmony-one/harmony/api/proto/node" - "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" ) @@ -31,9 +30,10 @@ type PingMessageType struct { // PongMessageType defines the data structure of the Pong message type PongMessageType struct { - Version uint16 // version of the protocol - Peers []node.Info - PubKeys [][]byte // list of publickKeys, has to be identical among all validators/leaders + Version uint16 // version of the protocol + Peers []node.Info + PubKeys [][]byte // list of publickKeys, has to be identical among all validators/leaders + LeaderPubKey []byte // public key of shard leader } func (p PingMessageType) String() string { @@ -41,7 +41,7 @@ func (p PingMessageType) String() string { } func (p PongMessageType) String() string { - str := fmt.Sprintf("pong:%v=>length:%v, keys:%v\n", p.Version, len(p.Peers), len(p.PubKeys)) + str := fmt.Sprintf("pong:%v=>length:%v, keys:%v, leader:%v\n", p.Version, len(p.Peers), len(p.PubKeys), len(p.LeaderPubKey)) return str } @@ -61,7 +61,7 @@ func NewPingMessage(peer p2p.Peer) *PingMessageType { } // NewPongMessage creates a new Pong message based on a list of p2p.Peer and a list of publicKeys -func NewPongMessage(peers []p2p.Peer, pubKeys []*bls.PublicKey) *PongMessageType { +func NewPongMessage(peers []p2p.Peer, pubKeys []*bls.PublicKey, leaderKey *bls.PublicKey) *PongMessageType { pong := new(PongMessageType) pong.PubKeys = make([][]byte, 0) @@ -89,7 +89,8 @@ func NewPongMessage(peers []p2p.Peer, pubKeys []*bls.PublicKey) *PongMessageType pong.PubKeys = append(pong.PubKeys, key) } - utils.GetLogInstance().Info("[pong message]", "keys", len(pong.PubKeys), "peers", len(pong.Peers)) + pong.LeaderPubKey = leaderKey.Serialize() + // utils.GetLogInstance().Info("[pong message]", "keys", len(pong.PubKeys), "peers", len(pong.Peers), "leader", len(pong.LeaderPubKey)) return pong } diff --git a/api/proto/discovery/pingpong_test.go b/api/proto/discovery/pingpong_test.go index 61dea2d73..4e6935bd6 100644 --- a/api/proto/discovery/pingpong_test.go +++ b/api/proto/discovery/pingpong_test.go @@ -42,6 +42,8 @@ var ( } e2 = "pong:1=>length:2" + leaderPubKey = pki.GetBLSPrivateKeyFromInt(888).GetPublicKey() + pubKeys = []*bls.PublicKey{pubKey1, pubKey2} buf1 []byte @@ -63,7 +65,7 @@ func TestString(test *testing.T) { test.Errorf("expect: %v, got: %v", e3, r3) } - pong1 := NewPongMessage(p2, pubKeys) + pong1 := NewPongMessage(p2, pubKeys, leaderPubKey) r2 := fmt.Sprintf("%v", *pong1) if !strings.HasPrefix(r2, e2) { @@ -86,7 +88,7 @@ func TestSerialize(test *testing.T) { test.Error("Serialize/Deserialze Ping Message Failed") } - pong1 := NewPongMessage(p2, pubKeys) + pong1 := NewPongMessage(p2, pubKeys, leaderPubKey) buf2 = pong1.ConstructPongMessage() msg2, err := proto.GetMessagePayload(buf2) diff --git a/api/proto/discovery/readme.md b/api/proto/discovery/readme.md new file mode 100644 index 000000000..e639e12b3 --- /dev/null +++ b/api/proto/discovery/readme.md @@ -0,0 +1,27 @@ +## Ping + +Ping message sent by new node who wants to join in the consensus. +The message is broadcasted to the shard. +It contains the public BLS key of consensus and the peerID of the node. +For backward compatibility, it still contains the IP/Port of the node, +but it will be removed once the full integration of libp2p is finished as the IP/Port is not needed. + +It also contains a Role field to indicate if the node is a client node or regular node, as client node +won't join the consensus. + +## Pong + +Pong message is sent by leader to all validators, once the leader has enough validators. +It contains a list of peers and the corresponding BLS public keys. +Noted, the list of peers may not be needed once we have libp2p fully integrated. +The order of the peers and keys are important to the consensus. + +At bootstrap, the Pong message is sent out and then the consensus should start. + +## TODO + +The following two todo should be worked on once we have full libp2p integration. +For network security reason, we should in general not expose the IP/Port of the node. + +-[] remove peer info in Ping message, only keep peerID, which should be sufficient for p2p communication. +-[] remove peer list from Pong message. diff --git a/api/service/blockproposal/service.go b/api/service/blockproposal/service.go index 64522d38d..4b66b455e 100644 --- a/api/service/blockproposal/service.go +++ b/api/service/blockproposal/service.go @@ -42,3 +42,8 @@ func (s *Service) StopService() { <-s.stoppedChan utils.GetLogInstance().Info("Role conversion stopped.") } + +// NotifyService notify service +func (s *Service) NotifyService(params map[string]interface{}) { + return +} diff --git a/api/service/clientsupport/service.go b/api/service/clientsupport/service.go index db378f57e..b81aa6118 100644 --- a/api/service/clientsupport/service.go +++ b/api/service/clientsupport/service.go @@ -43,3 +43,8 @@ func (sc *Service) StartService() { func (sc *Service) StopService() { sc.grpcServer.Stop() } + +// NotifyService notify service +func (sc *Service) NotifyService(params map[string]interface{}) { + return +} diff --git a/api/service/config.go b/api/service/config.go new file mode 100644 index 000000000..ba0c4db06 --- /dev/null +++ b/api/service/config.go @@ -0,0 +1,17 @@ +package service + +import ( + "github.com/harmony-one/harmony/p2p" +) + +// NodeConfig defines a structure of node configuration +// that can be used in services. +// This is to pass node configuration to services and prvent +// cyclic imports +type NodeConfig struct { + Beacon p2p.GroupID // the beacon group ID + Group p2p.GroupID // the group ID of the shard + IsClient bool // whether this node is a client node, such as wallet/txgen + IsBeacon bool // whether this node is a beacon node or not + Actions map[p2p.GroupID]p2p.ActionType // actions on the groups +} diff --git a/api/service/consensus/service.go b/api/service/consensus/service.go index aae53c766..2cb9c7c68 100644 --- a/api/service/consensus/service.go +++ b/api/service/consensus/service.go @@ -12,18 +12,19 @@ type Service struct { consensus *consensus.Consensus stopChan chan struct{} stoppedChan chan struct{} + startChan chan struct{} } // New returns consensus service. -func New(blockChannel chan *types.Block, consensus *consensus.Consensus) *Service { - return &Service{blockChannel: blockChannel, consensus: consensus} +func New(blockChannel chan *types.Block, consensus *consensus.Consensus, startChan chan struct{}) *Service { + return &Service{blockChannel: blockChannel, consensus: consensus, startChan: startChan} } // StartService starts consensus service. func (s *Service) StartService() { s.stopChan = make(chan struct{}) s.stoppedChan = make(chan struct{}) - s.consensus.WaitForNewBlock(s.blockChannel, s.stopChan, s.stoppedChan) + s.consensus.WaitForNewBlock(s.blockChannel, s.stopChan, s.stoppedChan, s.startChan) } // StopService stops consensus service. @@ -33,3 +34,8 @@ func (s *Service) StopService() { <-s.stoppedChan utils.GetLogInstance().Info("Consensus service stopped.") } + +// NotifyService notify service +func (s *Service) NotifyService(params map[string]interface{}) { + return +} diff --git a/api/service/discovery/discovery_test.go b/api/service/discovery/discovery_test.go index b69ab0e78..030f83c43 100644 --- a/api/service/discovery/discovery_test.go +++ b/api/service/discovery/discovery_test.go @@ -3,15 +3,16 @@ package discovery import ( "testing" + "github.com/harmony-one/harmony/api/service" "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/p2pimpl" ) var ( - ip = "127.0.0.1" - port = "7099" - service *Service + ip = "127.0.0.1" + port = "7099" + dService *Service ) func TestDiscoveryService(t *testing.T) { @@ -23,9 +24,11 @@ func TestDiscoveryService(t *testing.T) { t.Fatalf("unable to new host in harmony: %v", err) } - service = New(host, "rendezvous", nil, nil) + config := service.NodeConfig{} - if service == nil { + dService = New(host, config, nil) + + if dService == nil { t.Fatalf("unable to create new discovery service") } } diff --git a/api/service/discovery/service.go b/api/service/discovery/service.go index 74e260c05..2e0d35a4f 100644 --- a/api/service/discovery/service.go +++ b/api/service/discovery/service.go @@ -3,82 +3,136 @@ package discovery import ( "time" - "github.com/ethereum/go-ethereum/log" proto_discovery "github.com/harmony-one/harmony/api/proto/discovery" + proto_node "github.com/harmony-one/harmony/api/proto/node" + "github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p/host" + + "github.com/harmony-one/harmony/api/service" ) // Service is the struct for discovery service. type Service struct { - host p2p.Host - Rendezvous string - peerChan chan p2p.Peer - stakingChan chan p2p.Peer - stopChan chan struct{} + host p2p.Host + peerChan chan p2p.Peer + stopChan chan struct{} + actionChan chan p2p.GroupAction + config service.NodeConfig + actions map[p2p.GroupID]p2p.ActionType } // New returns discovery service. // h is the p2p host -// r is the rendezvous string, we use shardID to start (TODO: leo, build two overlays of network) -func New(h p2p.Host, r string, peerChan chan p2p.Peer, stakingChan chan p2p.Peer) *Service { +// config is the node config +// (TODO: leo, build two overlays of network) +func New(h p2p.Host, config service.NodeConfig, peerChan chan p2p.Peer) *Service { return &Service{ - host: h, - Rendezvous: r, - peerChan: peerChan, - stakingChan: stakingChan, - stopChan: make(chan struct{}), + host: h, + peerChan: peerChan, + stopChan: make(chan struct{}), + actionChan: make(chan p2p.GroupAction), + config: config, + actions: make(map[p2p.GroupID]p2p.ActionType), } } // StartService starts discovery service. func (s *Service) StartService() { - log.Info("Starting discovery service.") + utils.GetLogInstance().Info("Starting discovery service.") s.Init() s.Run() } // StopService shutdowns discovery service. func (s *Service) StopService() { - log.Info("Shutting down discovery service.") + utils.GetLogInstance().Info("Shutting down discovery service.") s.stopChan <- struct{}{} - log.Info("discovery service stopped.") + utils.GetLogInstance().Info("discovery service stopped.") +} + +// NotifyService receives notification from service manager +func (s *Service) NotifyService(params map[string]interface{}) { + data := params["peer"] + action, ok := data.(p2p.GroupAction) + if !ok { + utils.GetLogInstance().Error("Wrong data type passed to NotifyService") + return + } + + utils.GetLogInstance().Info("[DISCOVERY]", "got notified", action) + s.actionChan <- action } // Run is the main function of the service func (s *Service) Run() { go s.contactP2pPeers() - // go s.pingPeer() } func (s *Service) contactP2pPeers() { tick := time.NewTicker(5 * time.Second) - ping := proto_discovery.NewPingMessage(s.host.GetSelfPeer()) - buffer := ping.ConstructPingMessage() - content := host.ConstructP2pMessage(byte(0), buffer) + + pingMsg := proto_discovery.NewPingMessage(s.host.GetSelfPeer()) + regMsgBuf := host.ConstructP2pMessage(byte(0), pingMsg.ConstructPingMessage()) + + pingMsg.Node.Role = proto_node.ClientRole + clientMsgBuf := host.ConstructP2pMessage(byte(0), pingMsg.ConstructPingMessage()) + for { select { case peer, ok := <-s.peerChan: if !ok { - log.Debug("end of info", "peer", peer.PeerID) - return + utils.GetLogInstance().Debug("end of info", "peer", peer.PeerID) + break } s.host.AddPeer(&peer) // Add to outgoing peer list //s.host.AddOutgoingPeer(peer) - log.Debug("[DISCOVERY]", "add outgoing peer", peer) - // TODO: stop ping if pinged before - // TODO: call staking servcie here if it is a new node - if s.stakingChan != nil { - s.stakingChan <- peer - } + utils.GetLogInstance().Debug("[DISCOVERY]", "add outgoing peer", peer) case <-s.stopChan: - log.Debug("[DISCOVERY] stop") + utils.GetLogInstance().Debug("[DISCOVERY] stop pinging ...") return + case action := <-s.actionChan: + s.config.Actions[action.Name] = action.Action case <-tick.C: - err := s.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, content) - if err != nil { - log.Error("Failed to send ping message", "group", p2p.GroupIDBeacon) + var err error + for g, a := range s.config.Actions { + if a == p2p.ActionPause { + // Recived Pause Message, to reduce the frequency of ping message to every 1 minute + // TODO (leo) use different timer tick for different group, mainly differentiate beacon and regular shards + // beacon ping could be less frequent than regular shard + tick.Stop() + tick = time.NewTicker(1 * time.Minute) + } + + if a == p2p.ActionStart || a == p2p.ActionResume || a == p2p.ActionPause { + if g == p2p.GroupIDBeacon { + if s.config.IsBeacon { + // beacon chain node + err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Beacon}, regMsgBuf) + } else { + // non-beacon chain node, reg as client node + err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Beacon}, clientMsgBuf) + } + } else { + // The following logical will be used for 2nd stage peer discovery process + if s.config.Group == p2p.GroupIDUnknown { + continue + } + if s.config.IsClient { + // client node of reg shard, such as wallet/txgen + err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Group}, clientMsgBuf) + } else { + // regular node of a shard + err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Group}, regMsgBuf) + } + } + if err != nil { + utils.GetLogInstance().Error("Failed to send ping message", "group", g) + } else { + utils.GetLogInstance().Info("[DISCOVERY]", "Sent Ping Message", g) + } + } } } } @@ -86,27 +140,5 @@ func (s *Service) contactP2pPeers() { // Init is to initialize for discoveryService. func (s *Service) Init() { - log.Info("Init discovery service") -} - -func (s *Service) pingPeer() { - tick := time.NewTicker(5 * time.Second) - ping := proto_discovery.NewPingMessage(s.host.GetSelfPeer()) - buffer := ping.ConstructPingMessage() - content := host.ConstructP2pMessage(byte(0), buffer) - - for { - select { - case <-tick.C: - err := s.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, content) - if err != nil { - log.Error("Failed to send ping message", "group", p2p.GroupIDBeacon) - } - case <-s.stopChan: - log.Info("Stop sending ping message") - return - } - } - // s.host.SendMessage(peer, content) - // log.Debug("Sent Ping Message via unicast to", "peer", peer) + utils.GetLogInstance().Info("Init discovery service") } diff --git a/api/service/explorer/service.go b/api/service/explorer/service.go index 5aec132dd..e90b6ab1b 100644 --- a/api/service/explorer/service.go +++ b/api/service/explorer/service.go @@ -244,3 +244,8 @@ func (s *Service) GetExplorerAddress(w http.ResponseWriter, r *http.Request) { data.Address = address json.NewEncoder(w).Encode(data.Address) } + +// NotifyService notify service +func (s *Service) NotifyService(params map[string]interface{}) { + return +} diff --git a/api/service/manager.go b/api/service/manager.go index 04d3e993d..5955238a6 100644 --- a/api/service/manager.go +++ b/api/service/manager.go @@ -14,6 +14,7 @@ type ActionType byte const ( Start ActionType = iota Stop + Notify ) // Type is service type. @@ -73,13 +74,14 @@ const ( type Action struct { Action ActionType ServiceType Type - params map[string]interface{} + Params map[string]interface{} } // Interface is the collection of functions any service needs to implement. type Interface interface { StartService() StopService() + NotifyService(map[string]interface{}) } // Manager stores all services for service manager. @@ -136,6 +138,9 @@ func (m *Manager) TakeAction(action *Action) { case Stop: fmt.Printf("Stop %s\n", action.ServiceType) service.StopService() + case Notify: + fmt.Printf("Notify %s\n", action.ServiceType) + service.NotifyService(action.Params) } } } diff --git a/api/service/manager_test.go b/api/service/manager_test.go index 7e0235859..ff3f12338 100644 --- a/api/service/manager_test.go +++ b/api/service/manager_test.go @@ -16,6 +16,10 @@ func (s *SupportSyncingTest) StopService() { fmt.Println("SupportSyncingTest stopping") } +func (s *SupportSyncingTest) NotifyService(data map[string]interface{}) { + fmt.Println("SupportSyncingTest being notified") +} + // Test TakeAction. func TestTakeAction(t *testing.T) { m := &Manager{} diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index a8b493e24..318fcb913 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -19,7 +19,7 @@ import ( // Service is the network info service. type Service struct { Host p2p.Host - Rendezvous string + Rendezvous p2p.GroupID dht *libp2pdht.IpfsDHT ctx context.Context cancel context.CancelFunc @@ -31,7 +31,7 @@ type Service struct { } // New returns role conversion service. -func New(h p2p.Host, rendezvous string, peerChan chan p2p.Peer) *Service { +func New(h p2p.Host, rendezvous p2p.GroupID, peerChan chan p2p.Peer) *Service { timeout := 30 * time.Minute ctx, cancel := context.WithTimeout(context.Background(), timeout) dht, err := libp2pdht.New(ctx, h.GetP2PHost()) @@ -86,7 +86,7 @@ func (s *Service) Init() error { // We use a rendezvous point "shardID" to announce our location. utils.GetLogInstance().Info("Announcing ourselves...") s.discovery = libp2pdis.NewRoutingDiscovery(s.dht) - libp2pdis.Advertise(s.ctx, s.discovery, s.Rendezvous) + libp2pdis.Advertise(s.ctx, s.discovery, string(s.Rendezvous)) utils.GetLogInstance().Info("Successfully announced!") return nil @@ -96,7 +96,7 @@ func (s *Service) Init() error { func (s *Service) Run() { defer close(s.stoppedChan) var err error - s.peerInfo, err = s.discovery.FindPeers(s.ctx, s.Rendezvous) + s.peerInfo, err = s.discovery.FindPeers(s.ctx, string(s.Rendezvous)) if err != nil { utils.GetLogInstance().Error("FindPeers", "error", err) } @@ -150,3 +150,8 @@ func (s *Service) StopService() { <-s.stoppedChan utils.GetLogInstance().Info("Network info service stopped.") } + +// NotifyService notify service +func (s *Service) NotifyService(params map[string]interface{}) { + return +} diff --git a/api/service/randomness/service.go b/api/service/randomness/service.go index 1a4a0bafb..d331e3ad4 100644 --- a/api/service/randomness/service.go +++ b/api/service/randomness/service.go @@ -31,3 +31,8 @@ func (s *Service) StopService() { <-s.stoppedChan utils.GetLogInstance().Info("Random generation stopped.") } + +// NotifyService notify service +func (s *Service) NotifyService(params map[string]interface{}) { + return +} diff --git a/api/service/rconversion/service.go b/api/service/rconversion/service.go index 97c40fef3..2eddb7d47 100644 --- a/api/service/rconversion/service.go +++ b/api/service/rconversion/service.go @@ -56,3 +56,8 @@ func (s *Service) StopService() { <-s.stoppedChan utils.GetLogInstance().Info("Role conversion stopped.") } + +// NotifyService notify service +func (s *Service) NotifyService(params map[string]interface{}) { + return +} diff --git a/api/service/staking/service.go b/api/service/staking/service.go index 002e274a5..d09fe49b3 100644 --- a/api/service/staking/service.go +++ b/api/service/staking/service.go @@ -115,3 +115,8 @@ func (s *Service) StopService() { <-s.stoppedChan utils.GetLogInstance().Info("Role conversion stopped.") } + +// NotifyService notify service +func (s *Service) NotifyService(params map[string]interface{}) { + return +} diff --git a/cmd/harmony.go b/cmd/harmony.go index a3b45fb3d..fa260b10a 100644 --- a/cmd/harmony.go +++ b/cmd/harmony.go @@ -255,13 +255,14 @@ func main() { } else { currentNode.Role = node.BeaconValidator } - + currentNode.MyShardGroupID = p2p.GroupIDBeacon } else { if role == "leader" { currentNode.Role = node.ShardLeader } else { currentNode.Role = node.ShardValidator } + currentNode.MyShardGroupID = p2p.GroupIDUnknown } // Add randomness protocol diff --git a/consensus/consensus.go b/consensus/consensus.go index 8105687dc..7eae8f694 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -52,7 +52,7 @@ type Consensus struct { MinPeers int // Leader's address - leader p2p.Peer + Leader p2p.Peer // Public keys of the committee including leader and validators PublicKeys []*bls.PublicKey @@ -152,7 +152,7 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Cons consensus.IsLeader = false } - consensus.leader = leader + consensus.Leader = leader for _, peer := range peers { consensus.validators.Store(utils.GetUniqueIDFromPeer(peer), peer) } @@ -169,8 +169,8 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Cons consensus.PublicKeys = allPublicKeys - prepareBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.PubKey) - commitBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.PubKey) + prepareBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.Leader.PubKey) + commitBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.Leader.PubKey) consensus.prepareBitmap = prepareBitmap consensus.commitBitmap = commitBitmap @@ -227,6 +227,7 @@ func (consensus *Consensus) checkConsensusMessage(message consensus_proto.Messag consensusID := message.ConsensusId blockHash := message.BlockHash + utils.GetLogInstance().Warn("checkConsensusMessage", "publicKey", publicKey) // Verify message signature err := verifyMessageSig(publicKey, message) if err != nil { @@ -277,6 +278,7 @@ func verifyMessageSig(signerPubKey *bls.PublicKey, message consensus_proto.Messa return err } msgHash := sha256.Sum256(messageBytes) + utils.GetLogInstance().Debug("verifyMessageSig", "signerPubKey", signerPubKey, "msgHash", msgHash) if !msgSig.VerifyHash(signerPubKey, msgHash[:]) { return errors.New("failed to verify the signature") } @@ -349,8 +351,8 @@ func (consensus *Consensus) ResetState() { consensus.prepareSigs = map[uint32]*bls.Sign{} consensus.commitSigs = map[uint32]*bls.Sign{} - prepareBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.PubKey) - commitBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.PubKey) + prepareBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.Leader.PubKey) + commitBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.Leader.PubKey) consensus.prepareBitmap = prepareBitmap consensus.commitBitmap = commitBitmap @@ -438,11 +440,11 @@ func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int { // Or the shard won't be able to reach consensus if public keys are mismatch validators := consensus.GetValidatorPeers() - pong := proto_discovery.NewPongMessage(validators, consensus.PublicKeys) + pong := proto_discovery.NewPongMessage(validators, consensus.PublicKeys, consensus.Leader.PubKey) buffer := pong.ConstructPongMessage() if utils.UseLibP2P { - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, buffer) + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), buffer)) } else { host.BroadcastMessageFromLeader(consensus.host, validators, buffer, consensus.OfflinePeers) } diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 175af2af0..da05f522c 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -30,12 +30,15 @@ var ( ) // WaitForNewBlock waits for the next new block to run consensus on -func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block, stopChan chan struct{}, stoppedChan chan struct{}) { +func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block, stopChan chan struct{}, stoppedChan chan struct{}, startChannel chan struct{}) { go func() { defer close(stoppedChan) for { select { default: + // got the signal to start consensus + _ = <-startChannel + utils.GetLogInstance().Debug("Waiting for block", "consensus", consensus) // keep waiting for new blocks newBlock := <-blockChannel @@ -58,7 +61,7 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block, stop utils.GetLogInstance().Debug("[DRG] GOT pRnd", "pRnd", pRndAndBitmap) pRnd := pRndAndBitmap[:32] bitmap := pRndAndBitmap[32:] - vrfBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.PubKey) + vrfBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.Leader.PubKey) vrfBitmap.SetMask(bitmap) // TODO: check validity of pRnd @@ -145,6 +148,11 @@ func (consensus *Consensus) processPrepareMessage(message consensus_proto.Messag validatorPeer := consensus.getValidatorPeerByID(validatorID) + if validatorPeer == nil { + utils.GetLogInstance().Error("Invalid validator", "validatorID", validatorID) + return + } + if err := consensus.checkConsensusMessage(message, validatorPeer.PubKey); err != nil { utils.GetLogInstance().Debug("Failed to check the validator message", "validatorID", validatorID) return @@ -212,6 +220,11 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message validatorPeer := consensus.getValidatorPeerByID(validatorID) + if validatorPeer == nil { + utils.GetLogInstance().Error("Invalid validator", "validatorID", validatorID) + return + } + if err := consensus.checkConsensusMessage(message, validatorPeer.PubKey); err != nil { utils.GetLogInstance().Debug("Failed to check the validator message", "validatorID", validatorID) return @@ -287,7 +300,7 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message consensus.reportMetrics(blockObj) // Dump new block into level db. - explorer.GetStorageInstance(consensus.leader.IP, consensus.leader.Port, true).Dump(&blockObj, consensus.consensusID) + explorer.GetStorageInstance(consensus.Leader.IP, consensus.Leader.Port, true).Dump(&blockObj, consensus.consensusID) // Reset state to Finished, and clear other data. consensus.ResetState() diff --git a/consensus/consensus_test.go b/consensus/consensus_test.go index 567de9897..67fb0080f 100644 --- a/consensus/consensus_test.go +++ b/consensus/consensus_test.go @@ -31,7 +31,7 @@ func TestNew(test *testing.T) { test.Error("Consensus ReadySignal should be initialized") } - if consensus.leader.IP != leader.IP || consensus.leader.Port != leader.Port { + if consensus.Leader.IP != leader.IP || consensus.Leader.Port != leader.Port { test.Error("Consensus Leader is set to wrong Peer") } } diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 07b7f191d..799ed5e2e 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -75,7 +75,7 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa copy(consensus.blockHash[:], blockHash[:]) consensus.block = block - if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil { + if err := consensus.checkConsensusMessage(message, consensus.Leader.PubKey); err != nil { utils.GetLogInstance().Debug("Failed to check the leader message") if err == consensus_engine.ErrConsensusIDNotMatch { utils.GetLogInstance().Debug("sending bft block to state syncing") @@ -109,7 +109,7 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa if utils.UseLibP2P { consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) } else { - consensus.SendMessage(consensus.leader, msgToSend) + consensus.SendMessage(consensus.Leader, msgToSend) } consensus.state = PrepareDone @@ -137,7 +137,7 @@ func (consensus *Consensus) processPreparedMessage(message consensus_proto.Messa // Update readyByConsensus for attack. attack.GetInstance().UpdateConsensusReady(consensusID) - if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil { + if err := consensus.checkConsensusMessage(message, consensus.Leader.PubKey); err != nil { utils.GetLogInstance().Debug("processPreparedMessage error", "error", err) return } @@ -173,7 +173,7 @@ func (consensus *Consensus) processPreparedMessage(message consensus_proto.Messa if utils.UseLibP2P { consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) } else { - consensus.SendMessage(consensus.leader, msgToSend) + consensus.SendMessage(consensus.Leader, msgToSend) } consensus.state = CommitDone @@ -200,7 +200,7 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess // Update readyByConsensus for attack. attack.GetInstance().UpdateConsensusReady(consensusID) - if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil { + if err := consensus.checkConsensusMessage(message, consensus.Leader.PubKey); err != nil { utils.GetLogInstance().Debug("processCommittedMessage error", "error", err) return } diff --git a/node/node.go b/node/node.go index 377ea5309..b7c7f0ea9 100644 --- a/node/node.go +++ b/node/node.go @@ -25,6 +25,7 @@ import ( clientService "github.com/harmony-one/harmony/api/client/service" proto_discovery "github.com/harmony-one/harmony/api/proto/discovery" proto_node "github.com/harmony-one/harmony/api/proto/node" + "github.com/harmony-one/harmony/api/service" service_manager "github.com/harmony-one/harmony/api/service" blockproposal "github.com/harmony-one/harmony/api/service/blockproposal" "github.com/harmony-one/harmony/api/service/clientsupport" @@ -198,6 +199,12 @@ type Node struct { // Duplicated Ping Message Received duplicatedPing map[string]bool + + // Channel to notify consensus service to really start consensus + startConsensus chan struct{} + + // My GroupID + MyShardGroupID p2p.GroupID } // Blockchain returns the blockchain from node @@ -322,6 +329,8 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node { node.duplicatedPing = make(map[string]bool) + node.startConsensus = make(chan struct{}) + return &node } @@ -652,7 +661,7 @@ func (node *Node) UpdateStakingList(block *types.Block) error { for i := range txns { txn := txns[i] toAddress := txn.To() - if *toAddress != node.StakingContractAddress { //Not a address aimed at the staking contract. + if toAddress != nil && *toAddress != node.StakingContractAddress { //Not a address aimed at the staking contract. continue } currentSender, _ := types.Sender(signerType, txn) @@ -708,10 +717,33 @@ func decodeFuncSign(data []byte) string { } func (node *Node) setupForShardLeader() { + chanPeer := make(chan p2p.Peer) + + nodeConfig := service.NodeConfig{ + IsBeacon: false, + IsClient: false, + Beacon: p2p.GroupIDBeacon, + Group: p2p.GroupIDUnknown, + Actions: make(map[p2p.GroupID]p2p.ActionType), + } + nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart + + var err error + node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) + if err != nil { + utils.GetLogInstance().Error("create group receiver error", "msg", err) + return + } + + // Register peer discovery service. No need to do staking for beacon chain node. + node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) + // Register networkinfo service. "0" is the beacon shard ID + node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) + // Register explorer service. node.serviceManager.RegisterService(service_manager.SupportExplorer, explorer.New(&node.SelfPeer)) // Register consensus service. - node.serviceManager.RegisterService(service_manager.Consensus, consensus_service.New(node.BlockChannel, node.Consensus)) + node.serviceManager.RegisterService(service_manager.Consensus, consensus_service.New(node.BlockChannel, node.Consensus, node.startConsensus)) // Register new block service. node.serviceManager.RegisterService(service_manager.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReady)) // Register client support service. @@ -721,10 +753,40 @@ func (node *Node) setupForShardLeader() { } func (node *Node) setupForShardValidator() { + chanPeer := make(chan p2p.Peer) + nodeConfig := service.NodeConfig{ + IsBeacon: false, + IsClient: false, + Beacon: p2p.GroupIDBeacon, + Group: p2p.GroupIDUnknown, + Actions: make(map[p2p.GroupID]p2p.ActionType), + } + nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart + + var err error + node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) + if err != nil { + utils.GetLogInstance().Error("create group receiver error", "msg", err) + return + } + + // Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node. + node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) + // Register networkinfo service. "0" is the beacon shard ID + node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) + } func (node *Node) setupForBeaconLeader() { chanPeer := make(chan p2p.Peer) + nodeConfig := service.NodeConfig{ + IsBeacon: true, + IsClient: false, + Beacon: p2p.GroupIDBeacon, + Group: p2p.GroupIDUnknown, + Actions: make(map[p2p.GroupID]p2p.ActionType), + } + nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart var err error node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) @@ -733,13 +795,13 @@ func (node *Node) setupForBeaconLeader() { return } - // Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node. - node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, "0", chanPeer, nil)) - // Register networkinfo service. "0" is the beacon shard ID - node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, "0", chanPeer)) + // Register peer discovery service. No need to do staking for beacon chain node. + node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) + // Register networkinfo service. + node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) // Register consensus service. - node.serviceManager.RegisterService(service_manager.Consensus, consensus_service.New(node.BlockChannel, node.Consensus)) + node.serviceManager.RegisterService(service_manager.Consensus, consensus_service.New(node.BlockChannel, node.Consensus, node.startConsensus)) // Register new block service. node.serviceManager.RegisterService(service_manager.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReady)) // Register client support service. @@ -751,6 +813,14 @@ func (node *Node) setupForBeaconLeader() { func (node *Node) setupForBeaconValidator() { chanPeer := make(chan p2p.Peer) + nodeConfig := service.NodeConfig{ + IsBeacon: true, + IsClient: false, + Beacon: p2p.GroupIDBeacon, + Group: p2p.GroupIDUnknown, + Actions: make(map[p2p.GroupID]p2p.ActionType), + } + nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart var err error node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) @@ -759,10 +829,10 @@ func (node *Node) setupForBeaconValidator() { return } - // Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node. - node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, "0", chanPeer, nil)) - // Register networkinfo service. "0" is the beacon shard ID - node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, "0", chanPeer)) + // Register peer discovery service. No need to do staking for beacon chain node. + node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) + // Register networkinfo service. + node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) // Register randomness service node.serviceManager.RegisterService(service_manager.Randomness, randomness_service.New(node.DRand)) } @@ -771,6 +841,15 @@ func (node *Node) setupForNewNode() { chanPeer := make(chan p2p.Peer) stakingPeer := make(chan p2p.Peer) + nodeConfig := service.NodeConfig{ + IsBeacon: false, + IsClient: false, + Beacon: p2p.GroupIDBeacon, + Group: p2p.GroupIDUnknown, + Actions: make(map[p2p.GroupID]p2p.ActionType), + } + nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart + var err error node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) if err != nil { @@ -781,9 +860,9 @@ func (node *Node) setupForNewNode() { // Register staking service. node.serviceManager.RegisterService(service_manager.Staking, staking.New(node.AccountKey, 0, stakingPeer)) // Register peer discovery service. "0" is the beacon shard ID - node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, "0", chanPeer, stakingPeer)) + node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer)) // Register networkinfo service. "0" is the beacon shard ID - node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, "0", chanPeer)) + node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer)) // TODO: how to restart networkinfo and discovery service after receiving shard id info from beacon chain? } diff --git a/node/node_handler.go b/node/node_handler.go index eb36e4d98..19a155a74 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -231,7 +231,7 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) { if node.ClientPeer != nil { utils.GetLogInstance().Debug("Sending new block to client", "client", node.ClientPeer) if utils.UseLibP2P { - node.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock})) + node.host.SendMessageToGroups([]p2p.GroupID{node.MyShardGroupID}, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock})) } else { node.SendMessage(*node.ClientPeer, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock})) } @@ -261,6 +261,7 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) bool { // 2. [leader] send new block to the client func (node *Node) PostConsensusProcessing(newBlock *types.Block) { if node.Role == BeaconLeader || node.Role == BeaconValidator { + utils.GetLogInstance().Info("PostConsensusProcessing", "newBlock", newBlock) node.UpdateStakingList(newBlock) } if node.Consensus.IsLeader { @@ -338,7 +339,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte, sender string) int { // This is the old way of broadcasting pong message if node.Consensus.IsLeader && !utils.UseLibP2P { peers := node.Consensus.GetValidatorPeers() - pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys) + pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.Leader.PubKey) buffer := pong.ConstructPongMessage() // Send a Pong message directly to the sender @@ -386,19 +387,20 @@ func (node *Node) SendPongMessage() { } else { // stable number of peers/pubkeys, sent the pong message if !sentMessage { - pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys) + pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.Leader.PubKey) buffer := pong.ConstructPongMessage() content := host.ConstructP2pMessage(byte(0), buffer) - err := node.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, content) + err := node.host.SendMessageToGroups([]p2p.GroupID{node.MyShardGroupID}, content) if err != nil { - utils.GetLogInstance().Error("[PONG] failed to send pong message", "group", p2p.GroupIDBeacon) + utils.GetLogInstance().Error("[PONG] failed to send pong message", "group", node.MyShardGroupID) continue } else { - utils.GetLogInstance().Info("[PONG] sent pong message to", "group", p2p.GroupIDBeacon) + utils.GetLogInstance().Info("[PONG] sent pong message to", "group", node.MyShardGroupID) } sentMessage = true // stop sending ping message node.serviceManager.TakeAction(&service.Action{Action: service.Stop, ServiceType: service.PeerDiscovery}) + node.startConsensus <- struct{}{} } } numPeers = numPeersNow @@ -436,6 +438,12 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { node.AddPeers(peers) } + node.Consensus.Leader.PubKey = &bls.PublicKey{} + err = node.Consensus.Leader.PubKey.Deserialize(pong.LeaderPubKey) + if err != nil { + utils.GetLogInstance().Error("Unmarshal Leader PubKey Failed", "error", err) + } + // Reset Validator PublicKeys every time we receive PONG message from Leader // The PublicKeys has to be idential across the shard on every node // TODO (lc): we need to handle RemovePeer situation @@ -452,7 +460,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { publicKeys = append(publicKeys, &key) } - utils.GetLogInstance().Debug("[pongMessageHandler]", "#keys", len(publicKeys), "#peers", len(peers)) + // utils.GetLogInstance().Debug("[pongMessageHandler]", "#keys", len(publicKeys), "#peers", len(peers)) if node.State == NodeWaitToJoin { node.State = NodeReadyForConsensus @@ -463,6 +471,9 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { } // Stop discovery service after received pong message - node.serviceManager.TakeAction(&service.Action{Action: service.Stop, ServiceType: service.PeerDiscovery}) + data := make(map[string]interface{}) + data["peer"] = p2p.GroupAction{Name: node.MyShardGroupID, Action: p2p.ActionPause} + + node.serviceManager.TakeAction(&service.Action{Action: service.Notify, ServiceType: service.PeerDiscovery, Params: data}) return node.Consensus.UpdatePublicKeys(publicKeys) } diff --git a/node/node_test.go b/node/node_test.go index 7356e3fe0..1c8036f3c 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/harmony-one/bls/ffi/go/bls" "github.com/harmony-one/harmony/drand" "github.com/ethereum/go-ethereum/common" @@ -148,7 +149,10 @@ func sendPongMessage(node *Node, leader p2p.Peer) { PubKey: pubKey2, } - pong1 := proto_discovery.NewPongMessage([]p2p.Peer{p1, p2}, nil) + pubKeys := []*bls.PublicKey{pubKey1, pubKey2} + leaderPubKey := pki.GetBLSPrivateKeyFromInt(888).GetPublicKey() + + pong1 := proto_discovery.NewPongMessage([]p2p.Peer{p1, p2}, pubKeys, leaderPubKey) buf1 := pong1.ConstructPongMessage() fmt.Println("waiting for 10 seconds ...") diff --git a/p2p/group.go b/p2p/group.go index 159371f0f..68304969d 100644 --- a/p2p/group.go +++ b/p2p/group.go @@ -24,10 +24,47 @@ func (id GroupID) String() string { // Const of group ID const ( - GroupIDBeacon GroupID = "harmony/0.0.1/beacon" - GroupIDGlobal GroupID = "harmony/0.0.1/global" + GroupIDBeacon GroupID = "harmony/0.0.1/beacon" + GroupIDGlobal GroupID = "harmony/0.0.1/global" + GroupIDUnknown GroupID = "B1acKh0lE" ) +// ActionType lists action on group +type ActionType uint + +// Const of different Action type +const ( + ActionStart ActionType = iota + ActionPause + ActionResume + ActionStop + ActionUnknown +) + +func (a ActionType) String() string { + switch a { + case ActionStart: + return "ActionStart" + case ActionPause: + return "ActionPause" + case ActionResume: + return "ActionResume" + case ActionStop: + return "ActionStop" + } + return "ActionUnknown" +} + +// GroupAction specify action on corresponding group +type GroupAction struct { + Name GroupID + Action ActionType +} + +func (g GroupAction) String() string { + return fmt.Sprintf("%s/%s", g.Name, g.Action) +} + // GroupReceiver is a multicast group message receiver interface. type GroupReceiver interface { // Close closes this receiver. diff --git a/p2p/host/message.go b/p2p/host/message.go index 522343bc6..af90a81ce 100644 --- a/p2p/host/message.go +++ b/p2p/host/message.go @@ -3,7 +3,6 @@ package host import ( "encoding/binary" "net" - "runtime" "time" "github.com/ethereum/go-ethereum/log" @@ -27,13 +26,13 @@ func BroadcastMessage(h p2p.Host, peers []p2p.Peer, msg []byte, lostPeer chan p2 content := ConstructP2pMessage(byte(17), msg) length := len(content) - log.Info("Start Broadcasting", "gomaxprocs", runtime.GOMAXPROCS(0), "Size", length) - start := time.Now() + // log.Info("Start Broadcasting", "gomaxprocs", runtime.GOMAXPROCS(0), "Size", length) + // start := time.Now() for _, peer := range peers { peerCopy := peer go send(h, peerCopy, content, lostPeer) } - log.Info("Broadcasting Done", "time spent(s)", time.Since(start).Seconds()) + // log.Info("Broadcasting Done", "time spent(s)", time.Since(start).Seconds()) // Keep track of block propagation time // Assume 1M message is for block propagation