Merge pull request #465 from LeoHChen/libp2p

Use Libp2p pubsub to reach consensus
pull/469/head
Leo Chen 6 years ago committed by GitHub
commit e2b0452519
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      api/proto/discovery/pingpong.go
  2. 6
      api/proto/discovery/pingpong_test.go
  3. 27
      api/proto/discovery/readme.md
  4. 5
      api/service/blockproposal/service.go
  5. 5
      api/service/clientsupport/service.go
  6. 17
      api/service/config.go
  7. 12
      api/service/consensus/service.go
  8. 13
      api/service/discovery/discovery_test.go
  9. 142
      api/service/discovery/service.go
  10. 5
      api/service/explorer/service.go
  11. 7
      api/service/manager.go
  12. 4
      api/service/manager_test.go
  13. 13
      api/service/networkinfo/service.go
  14. 5
      api/service/randomness/service.go
  15. 5
      api/service/rconversion/service.go
  16. 5
      api/service/staking/service.go
  17. 3
      cmd/harmony.go
  18. 18
      consensus/consensus.go
  19. 19
      consensus/consensus_leader.go
  20. 2
      consensus/consensus_test.go
  21. 10
      consensus/consensus_validator.go
  22. 105
      node/node.go
  23. 27
      node/node_handler.go
  24. 6
      node/node_test.go
  25. 41
      p2p/group.go
  26. 7
      p2p/host/message.go

@ -19,7 +19,6 @@ import (
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/proto"
"github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
)
@ -31,9 +30,10 @@ type PingMessageType struct {
// PongMessageType defines the data structure of the Pong message
type PongMessageType struct {
Version uint16 // version of the protocol
Peers []node.Info
PubKeys [][]byte // list of publickKeys, has to be identical among all validators/leaders
Version uint16 // version of the protocol
Peers []node.Info
PubKeys [][]byte // list of publickKeys, has to be identical among all validators/leaders
LeaderPubKey []byte // public key of shard leader
}
func (p PingMessageType) String() string {
@ -41,7 +41,7 @@ func (p PingMessageType) String() string {
}
func (p PongMessageType) String() string {
str := fmt.Sprintf("pong:%v=>length:%v, keys:%v\n", p.Version, len(p.Peers), len(p.PubKeys))
str := fmt.Sprintf("pong:%v=>length:%v, keys:%v, leader:%v\n", p.Version, len(p.Peers), len(p.PubKeys), len(p.LeaderPubKey))
return str
}
@ -61,7 +61,7 @@ func NewPingMessage(peer p2p.Peer) *PingMessageType {
}
// NewPongMessage creates a new Pong message based on a list of p2p.Peer and a list of publicKeys
func NewPongMessage(peers []p2p.Peer, pubKeys []*bls.PublicKey) *PongMessageType {
func NewPongMessage(peers []p2p.Peer, pubKeys []*bls.PublicKey, leaderKey *bls.PublicKey) *PongMessageType {
pong := new(PongMessageType)
pong.PubKeys = make([][]byte, 0)
@ -89,7 +89,8 @@ func NewPongMessage(peers []p2p.Peer, pubKeys []*bls.PublicKey) *PongMessageType
pong.PubKeys = append(pong.PubKeys, key)
}
utils.GetLogInstance().Info("[pong message]", "keys", len(pong.PubKeys), "peers", len(pong.Peers))
pong.LeaderPubKey = leaderKey.Serialize()
// utils.GetLogInstance().Info("[pong message]", "keys", len(pong.PubKeys), "peers", len(pong.Peers), "leader", len(pong.LeaderPubKey))
return pong
}

@ -42,6 +42,8 @@ var (
}
e2 = "pong:1=>length:2"
leaderPubKey = pki.GetBLSPrivateKeyFromInt(888).GetPublicKey()
pubKeys = []*bls.PublicKey{pubKey1, pubKey2}
buf1 []byte
@ -63,7 +65,7 @@ func TestString(test *testing.T) {
test.Errorf("expect: %v, got: %v", e3, r3)
}
pong1 := NewPongMessage(p2, pubKeys)
pong1 := NewPongMessage(p2, pubKeys, leaderPubKey)
r2 := fmt.Sprintf("%v", *pong1)
if !strings.HasPrefix(r2, e2) {
@ -86,7 +88,7 @@ func TestSerialize(test *testing.T) {
test.Error("Serialize/Deserialze Ping Message Failed")
}
pong1 := NewPongMessage(p2, pubKeys)
pong1 := NewPongMessage(p2, pubKeys, leaderPubKey)
buf2 = pong1.ConstructPongMessage()
msg2, err := proto.GetMessagePayload(buf2)

@ -0,0 +1,27 @@
## Ping
Ping message sent by new node who wants to join in the consensus.
The message is broadcasted to the shard.
It contains the public BLS key of consensus and the peerID of the node.
For backward compatibility, it still contains the IP/Port of the node,
but it will be removed once the full integration of libp2p is finished as the IP/Port is not needed.
It also contains a Role field to indicate if the node is a client node or regular node, as client node
won't join the consensus.
## Pong
Pong message is sent by leader to all validators, once the leader has enough validators.
It contains a list of peers and the corresponding BLS public keys.
Noted, the list of peers may not be needed once we have libp2p fully integrated.
The order of the peers and keys are important to the consensus.
At bootstrap, the Pong message is sent out and then the consensus should start.
## TODO
The following two todo should be worked on once we have full libp2p integration.
For network security reason, we should in general not expose the IP/Port of the node.
-[] remove peer info in Ping message, only keep peerID, which should be sufficient for p2p communication.
-[] remove peer list from Pong message.

@ -42,3 +42,8 @@ func (s *Service) StopService() {
<-s.stoppedChan
utils.GetLogInstance().Info("Role conversion stopped.")
}
// NotifyService notify service
func (s *Service) NotifyService(params map[string]interface{}) {
return
}

@ -43,3 +43,8 @@ func (sc *Service) StartService() {
func (sc *Service) StopService() {
sc.grpcServer.Stop()
}
// NotifyService notify service
func (sc *Service) NotifyService(params map[string]interface{}) {
return
}

@ -0,0 +1,17 @@
package service
import (
"github.com/harmony-one/harmony/p2p"
)
// NodeConfig defines a structure of node configuration
// that can be used in services.
// This is to pass node configuration to services and prvent
// cyclic imports
type NodeConfig struct {
Beacon p2p.GroupID // the beacon group ID
Group p2p.GroupID // the group ID of the shard
IsClient bool // whether this node is a client node, such as wallet/txgen
IsBeacon bool // whether this node is a beacon node or not
Actions map[p2p.GroupID]p2p.ActionType // actions on the groups
}

@ -12,18 +12,19 @@ type Service struct {
consensus *consensus.Consensus
stopChan chan struct{}
stoppedChan chan struct{}
startChan chan struct{}
}
// New returns consensus service.
func New(blockChannel chan *types.Block, consensus *consensus.Consensus) *Service {
return &Service{blockChannel: blockChannel, consensus: consensus}
func New(blockChannel chan *types.Block, consensus *consensus.Consensus, startChan chan struct{}) *Service {
return &Service{blockChannel: blockChannel, consensus: consensus, startChan: startChan}
}
// StartService starts consensus service.
func (s *Service) StartService() {
s.stopChan = make(chan struct{})
s.stoppedChan = make(chan struct{})
s.consensus.WaitForNewBlock(s.blockChannel, s.stopChan, s.stoppedChan)
s.consensus.WaitForNewBlock(s.blockChannel, s.stopChan, s.stoppedChan, s.startChan)
}
// StopService stops consensus service.
@ -33,3 +34,8 @@ func (s *Service) StopService() {
<-s.stoppedChan
utils.GetLogInstance().Info("Consensus service stopped.")
}
// NotifyService notify service
func (s *Service) NotifyService(params map[string]interface{}) {
return
}

@ -3,15 +3,16 @@ package discovery
import (
"testing"
"github.com/harmony-one/harmony/api/service"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/p2pimpl"
)
var (
ip = "127.0.0.1"
port = "7099"
service *Service
ip = "127.0.0.1"
port = "7099"
dService *Service
)
func TestDiscoveryService(t *testing.T) {
@ -23,9 +24,11 @@ func TestDiscoveryService(t *testing.T) {
t.Fatalf("unable to new host in harmony: %v", err)
}
service = New(host, "rendezvous", nil, nil)
config := service.NodeConfig{}
if service == nil {
dService = New(host, config, nil)
if dService == nil {
t.Fatalf("unable to create new discovery service")
}
}

@ -3,82 +3,136 @@ package discovery
import (
"time"
"github.com/ethereum/go-ethereum/log"
proto_discovery "github.com/harmony-one/harmony/api/proto/discovery"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/api/service"
)
// Service is the struct for discovery service.
type Service struct {
host p2p.Host
Rendezvous string
peerChan chan p2p.Peer
stakingChan chan p2p.Peer
stopChan chan struct{}
host p2p.Host
peerChan chan p2p.Peer
stopChan chan struct{}
actionChan chan p2p.GroupAction
config service.NodeConfig
actions map[p2p.GroupID]p2p.ActionType
}
// New returns discovery service.
// h is the p2p host
// r is the rendezvous string, we use shardID to start (TODO: leo, build two overlays of network)
func New(h p2p.Host, r string, peerChan chan p2p.Peer, stakingChan chan p2p.Peer) *Service {
// config is the node config
// (TODO: leo, build two overlays of network)
func New(h p2p.Host, config service.NodeConfig, peerChan chan p2p.Peer) *Service {
return &Service{
host: h,
Rendezvous: r,
peerChan: peerChan,
stakingChan: stakingChan,
stopChan: make(chan struct{}),
host: h,
peerChan: peerChan,
stopChan: make(chan struct{}),
actionChan: make(chan p2p.GroupAction),
config: config,
actions: make(map[p2p.GroupID]p2p.ActionType),
}
}
// StartService starts discovery service.
func (s *Service) StartService() {
log.Info("Starting discovery service.")
utils.GetLogInstance().Info("Starting discovery service.")
s.Init()
s.Run()
}
// StopService shutdowns discovery service.
func (s *Service) StopService() {
log.Info("Shutting down discovery service.")
utils.GetLogInstance().Info("Shutting down discovery service.")
s.stopChan <- struct{}{}
log.Info("discovery service stopped.")
utils.GetLogInstance().Info("discovery service stopped.")
}
// NotifyService receives notification from service manager
func (s *Service) NotifyService(params map[string]interface{}) {
data := params["peer"]
action, ok := data.(p2p.GroupAction)
if !ok {
utils.GetLogInstance().Error("Wrong data type passed to NotifyService")
return
}
utils.GetLogInstance().Info("[DISCOVERY]", "got notified", action)
s.actionChan <- action
}
// Run is the main function of the service
func (s *Service) Run() {
go s.contactP2pPeers()
// go s.pingPeer()
}
func (s *Service) contactP2pPeers() {
tick := time.NewTicker(5 * time.Second)
ping := proto_discovery.NewPingMessage(s.host.GetSelfPeer())
buffer := ping.ConstructPingMessage()
content := host.ConstructP2pMessage(byte(0), buffer)
pingMsg := proto_discovery.NewPingMessage(s.host.GetSelfPeer())
regMsgBuf := host.ConstructP2pMessage(byte(0), pingMsg.ConstructPingMessage())
pingMsg.Node.Role = proto_node.ClientRole
clientMsgBuf := host.ConstructP2pMessage(byte(0), pingMsg.ConstructPingMessage())
for {
select {
case peer, ok := <-s.peerChan:
if !ok {
log.Debug("end of info", "peer", peer.PeerID)
return
utils.GetLogInstance().Debug("end of info", "peer", peer.PeerID)
break
}
s.host.AddPeer(&peer)
// Add to outgoing peer list
//s.host.AddOutgoingPeer(peer)
log.Debug("[DISCOVERY]", "add outgoing peer", peer)
// TODO: stop ping if pinged before
// TODO: call staking servcie here if it is a new node
if s.stakingChan != nil {
s.stakingChan <- peer
}
utils.GetLogInstance().Debug("[DISCOVERY]", "add outgoing peer", peer)
case <-s.stopChan:
log.Debug("[DISCOVERY] stop")
utils.GetLogInstance().Debug("[DISCOVERY] stop pinging ...")
return
case action := <-s.actionChan:
s.config.Actions[action.Name] = action.Action
case <-tick.C:
err := s.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, content)
if err != nil {
log.Error("Failed to send ping message", "group", p2p.GroupIDBeacon)
var err error
for g, a := range s.config.Actions {
if a == p2p.ActionPause {
// Recived Pause Message, to reduce the frequency of ping message to every 1 minute
// TODO (leo) use different timer tick for different group, mainly differentiate beacon and regular shards
// beacon ping could be less frequent than regular shard
tick.Stop()
tick = time.NewTicker(1 * time.Minute)
}
if a == p2p.ActionStart || a == p2p.ActionResume || a == p2p.ActionPause {
if g == p2p.GroupIDBeacon {
if s.config.IsBeacon {
// beacon chain node
err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Beacon}, regMsgBuf)
} else {
// non-beacon chain node, reg as client node
err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Beacon}, clientMsgBuf)
}
} else {
// The following logical will be used for 2nd stage peer discovery process
if s.config.Group == p2p.GroupIDUnknown {
continue
}
if s.config.IsClient {
// client node of reg shard, such as wallet/txgen
err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Group}, clientMsgBuf)
} else {
// regular node of a shard
err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Group}, regMsgBuf)
}
}
if err != nil {
utils.GetLogInstance().Error("Failed to send ping message", "group", g)
} else {
utils.GetLogInstance().Info("[DISCOVERY]", "Sent Ping Message", g)
}
}
}
}
}
@ -86,27 +140,5 @@ func (s *Service) contactP2pPeers() {
// Init is to initialize for discoveryService.
func (s *Service) Init() {
log.Info("Init discovery service")
}
func (s *Service) pingPeer() {
tick := time.NewTicker(5 * time.Second)
ping := proto_discovery.NewPingMessage(s.host.GetSelfPeer())
buffer := ping.ConstructPingMessage()
content := host.ConstructP2pMessage(byte(0), buffer)
for {
select {
case <-tick.C:
err := s.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, content)
if err != nil {
log.Error("Failed to send ping message", "group", p2p.GroupIDBeacon)
}
case <-s.stopChan:
log.Info("Stop sending ping message")
return
}
}
// s.host.SendMessage(peer, content)
// log.Debug("Sent Ping Message via unicast to", "peer", peer)
utils.GetLogInstance().Info("Init discovery service")
}

@ -244,3 +244,8 @@ func (s *Service) GetExplorerAddress(w http.ResponseWriter, r *http.Request) {
data.Address = address
json.NewEncoder(w).Encode(data.Address)
}
// NotifyService notify service
func (s *Service) NotifyService(params map[string]interface{}) {
return
}

@ -14,6 +14,7 @@ type ActionType byte
const (
Start ActionType = iota
Stop
Notify
)
// Type is service type.
@ -73,13 +74,14 @@ const (
type Action struct {
Action ActionType
ServiceType Type
params map[string]interface{}
Params map[string]interface{}
}
// Interface is the collection of functions any service needs to implement.
type Interface interface {
StartService()
StopService()
NotifyService(map[string]interface{})
}
// Manager stores all services for service manager.
@ -136,6 +138,9 @@ func (m *Manager) TakeAction(action *Action) {
case Stop:
fmt.Printf("Stop %s\n", action.ServiceType)
service.StopService()
case Notify:
fmt.Printf("Notify %s\n", action.ServiceType)
service.NotifyService(action.Params)
}
}
}

@ -16,6 +16,10 @@ func (s *SupportSyncingTest) StopService() {
fmt.Println("SupportSyncingTest stopping")
}
func (s *SupportSyncingTest) NotifyService(data map[string]interface{}) {
fmt.Println("SupportSyncingTest being notified")
}
// Test TakeAction.
func TestTakeAction(t *testing.T) {
m := &Manager{}

@ -19,7 +19,7 @@ import (
// Service is the network info service.
type Service struct {
Host p2p.Host
Rendezvous string
Rendezvous p2p.GroupID
dht *libp2pdht.IpfsDHT
ctx context.Context
cancel context.CancelFunc
@ -31,7 +31,7 @@ type Service struct {
}
// New returns role conversion service.
func New(h p2p.Host, rendezvous string, peerChan chan p2p.Peer) *Service {
func New(h p2p.Host, rendezvous p2p.GroupID, peerChan chan p2p.Peer) *Service {
timeout := 30 * time.Minute
ctx, cancel := context.WithTimeout(context.Background(), timeout)
dht, err := libp2pdht.New(ctx, h.GetP2PHost())
@ -86,7 +86,7 @@ func (s *Service) Init() error {
// We use a rendezvous point "shardID" to announce our location.
utils.GetLogInstance().Info("Announcing ourselves...")
s.discovery = libp2pdis.NewRoutingDiscovery(s.dht)
libp2pdis.Advertise(s.ctx, s.discovery, s.Rendezvous)
libp2pdis.Advertise(s.ctx, s.discovery, string(s.Rendezvous))
utils.GetLogInstance().Info("Successfully announced!")
return nil
@ -96,7 +96,7 @@ func (s *Service) Init() error {
func (s *Service) Run() {
defer close(s.stoppedChan)
var err error
s.peerInfo, err = s.discovery.FindPeers(s.ctx, s.Rendezvous)
s.peerInfo, err = s.discovery.FindPeers(s.ctx, string(s.Rendezvous))
if err != nil {
utils.GetLogInstance().Error("FindPeers", "error", err)
}
@ -150,3 +150,8 @@ func (s *Service) StopService() {
<-s.stoppedChan
utils.GetLogInstance().Info("Network info service stopped.")
}
// NotifyService notify service
func (s *Service) NotifyService(params map[string]interface{}) {
return
}

@ -31,3 +31,8 @@ func (s *Service) StopService() {
<-s.stoppedChan
utils.GetLogInstance().Info("Random generation stopped.")
}
// NotifyService notify service
func (s *Service) NotifyService(params map[string]interface{}) {
return
}

@ -56,3 +56,8 @@ func (s *Service) StopService() {
<-s.stoppedChan
utils.GetLogInstance().Info("Role conversion stopped.")
}
// NotifyService notify service
func (s *Service) NotifyService(params map[string]interface{}) {
return
}

@ -115,3 +115,8 @@ func (s *Service) StopService() {
<-s.stoppedChan
utils.GetLogInstance().Info("Role conversion stopped.")
}
// NotifyService notify service
func (s *Service) NotifyService(params map[string]interface{}) {
return
}

@ -255,13 +255,14 @@ func main() {
} else {
currentNode.Role = node.BeaconValidator
}
currentNode.MyShardGroupID = p2p.GroupIDBeacon
} else {
if role == "leader" {
currentNode.Role = node.ShardLeader
} else {
currentNode.Role = node.ShardValidator
}
currentNode.MyShardGroupID = p2p.GroupIDUnknown
}
// Add randomness protocol

@ -52,7 +52,7 @@ type Consensus struct {
MinPeers int
// Leader's address
leader p2p.Peer
Leader p2p.Peer
// Public keys of the committee including leader and validators
PublicKeys []*bls.PublicKey
@ -152,7 +152,7 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Cons
consensus.IsLeader = false
}
consensus.leader = leader
consensus.Leader = leader
for _, peer := range peers {
consensus.validators.Store(utils.GetUniqueIDFromPeer(peer), peer)
}
@ -169,8 +169,8 @@ func New(host p2p.Host, ShardID string, peers []p2p.Peer, leader p2p.Peer) *Cons
consensus.PublicKeys = allPublicKeys
prepareBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.PubKey)
commitBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.PubKey)
prepareBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.Leader.PubKey)
commitBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.Leader.PubKey)
consensus.prepareBitmap = prepareBitmap
consensus.commitBitmap = commitBitmap
@ -227,6 +227,7 @@ func (consensus *Consensus) checkConsensusMessage(message consensus_proto.Messag
consensusID := message.ConsensusId
blockHash := message.BlockHash
utils.GetLogInstance().Warn("checkConsensusMessage", "publicKey", publicKey)
// Verify message signature
err := verifyMessageSig(publicKey, message)
if err != nil {
@ -277,6 +278,7 @@ func verifyMessageSig(signerPubKey *bls.PublicKey, message consensus_proto.Messa
return err
}
msgHash := sha256.Sum256(messageBytes)
utils.GetLogInstance().Debug("verifyMessageSig", "signerPubKey", signerPubKey, "msgHash", msgHash)
if !msgSig.VerifyHash(signerPubKey, msgHash[:]) {
return errors.New("failed to verify the signature")
}
@ -349,8 +351,8 @@ func (consensus *Consensus) ResetState() {
consensus.prepareSigs = map[uint32]*bls.Sign{}
consensus.commitSigs = map[uint32]*bls.Sign{}
prepareBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.PubKey)
commitBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.PubKey)
prepareBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.Leader.PubKey)
commitBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.Leader.PubKey)
consensus.prepareBitmap = prepareBitmap
consensus.commitBitmap = commitBitmap
@ -438,11 +440,11 @@ func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int {
// Or the shard won't be able to reach consensus if public keys are mismatch
validators := consensus.GetValidatorPeers()
pong := proto_discovery.NewPongMessage(validators, consensus.PublicKeys)
pong := proto_discovery.NewPongMessage(validators, consensus.PublicKeys, consensus.Leader.PubKey)
buffer := pong.ConstructPongMessage()
if utils.UseLibP2P {
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, buffer)
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), buffer))
} else {
host.BroadcastMessageFromLeader(consensus.host, validators, buffer, consensus.OfflinePeers)
}

@ -30,12 +30,15 @@ var (
)
// WaitForNewBlock waits for the next new block to run consensus on
func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block, stopChan chan struct{}, stoppedChan chan struct{}) {
func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block, stopChan chan struct{}, stoppedChan chan struct{}, startChannel chan struct{}) {
go func() {
defer close(stoppedChan)
for {
select {
default:
// got the signal to start consensus
_ = <-startChannel
utils.GetLogInstance().Debug("Waiting for block", "consensus", consensus)
// keep waiting for new blocks
newBlock := <-blockChannel
@ -58,7 +61,7 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block, stop
utils.GetLogInstance().Debug("[DRG] GOT pRnd", "pRnd", pRndAndBitmap)
pRnd := pRndAndBitmap[:32]
bitmap := pRndAndBitmap[32:]
vrfBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.leader.PubKey)
vrfBitmap, _ := bls_cosi.NewMask(consensus.PublicKeys, consensus.Leader.PubKey)
vrfBitmap.SetMask(bitmap)
// TODO: check validity of pRnd
@ -145,6 +148,11 @@ func (consensus *Consensus) processPrepareMessage(message consensus_proto.Messag
validatorPeer := consensus.getValidatorPeerByID(validatorID)
if validatorPeer == nil {
utils.GetLogInstance().Error("Invalid validator", "validatorID", validatorID)
return
}
if err := consensus.checkConsensusMessage(message, validatorPeer.PubKey); err != nil {
utils.GetLogInstance().Debug("Failed to check the validator message", "validatorID", validatorID)
return
@ -212,6 +220,11 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message
validatorPeer := consensus.getValidatorPeerByID(validatorID)
if validatorPeer == nil {
utils.GetLogInstance().Error("Invalid validator", "validatorID", validatorID)
return
}
if err := consensus.checkConsensusMessage(message, validatorPeer.PubKey); err != nil {
utils.GetLogInstance().Debug("Failed to check the validator message", "validatorID", validatorID)
return
@ -287,7 +300,7 @@ func (consensus *Consensus) processCommitMessage(message consensus_proto.Message
consensus.reportMetrics(blockObj)
// Dump new block into level db.
explorer.GetStorageInstance(consensus.leader.IP, consensus.leader.Port, true).Dump(&blockObj, consensus.consensusID)
explorer.GetStorageInstance(consensus.Leader.IP, consensus.Leader.Port, true).Dump(&blockObj, consensus.consensusID)
// Reset state to Finished, and clear other data.
consensus.ResetState()

@ -31,7 +31,7 @@ func TestNew(test *testing.T) {
test.Error("Consensus ReadySignal should be initialized")
}
if consensus.leader.IP != leader.IP || consensus.leader.Port != leader.Port {
if consensus.Leader.IP != leader.IP || consensus.Leader.Port != leader.Port {
test.Error("Consensus Leader is set to wrong Peer")
}
}

@ -75,7 +75,7 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa
copy(consensus.blockHash[:], blockHash[:])
consensus.block = block
if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil {
if err := consensus.checkConsensusMessage(message, consensus.Leader.PubKey); err != nil {
utils.GetLogInstance().Debug("Failed to check the leader message")
if err == consensus_engine.ErrConsensusIDNotMatch {
utils.GetLogInstance().Debug("sending bft block to state syncing")
@ -109,7 +109,7 @@ func (consensus *Consensus) processAnnounceMessage(message consensus_proto.Messa
if utils.UseLibP2P {
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else {
consensus.SendMessage(consensus.leader, msgToSend)
consensus.SendMessage(consensus.Leader, msgToSend)
}
consensus.state = PrepareDone
@ -137,7 +137,7 @@ func (consensus *Consensus) processPreparedMessage(message consensus_proto.Messa
// Update readyByConsensus for attack.
attack.GetInstance().UpdateConsensusReady(consensusID)
if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil {
if err := consensus.checkConsensusMessage(message, consensus.Leader.PubKey); err != nil {
utils.GetLogInstance().Debug("processPreparedMessage error", "error", err)
return
}
@ -173,7 +173,7 @@ func (consensus *Consensus) processPreparedMessage(message consensus_proto.Messa
if utils.UseLibP2P {
consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend))
} else {
consensus.SendMessage(consensus.leader, msgToSend)
consensus.SendMessage(consensus.Leader, msgToSend)
}
consensus.state = CommitDone
@ -200,7 +200,7 @@ func (consensus *Consensus) processCommittedMessage(message consensus_proto.Mess
// Update readyByConsensus for attack.
attack.GetInstance().UpdateConsensusReady(consensusID)
if err := consensus.checkConsensusMessage(message, consensus.leader.PubKey); err != nil {
if err := consensus.checkConsensusMessage(message, consensus.Leader.PubKey); err != nil {
utils.GetLogInstance().Debug("processCommittedMessage error", "error", err)
return
}

@ -25,6 +25,7 @@ import (
clientService "github.com/harmony-one/harmony/api/client/service"
proto_discovery "github.com/harmony-one/harmony/api/proto/discovery"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/api/service"
service_manager "github.com/harmony-one/harmony/api/service"
blockproposal "github.com/harmony-one/harmony/api/service/blockproposal"
"github.com/harmony-one/harmony/api/service/clientsupport"
@ -198,6 +199,12 @@ type Node struct {
// Duplicated Ping Message Received
duplicatedPing map[string]bool
// Channel to notify consensus service to really start consensus
startConsensus chan struct{}
// My GroupID
MyShardGroupID p2p.GroupID
}
// Blockchain returns the blockchain from node
@ -322,6 +329,8 @@ func New(host p2p.Host, consensus *bft.Consensus, db ethdb.Database) *Node {
node.duplicatedPing = make(map[string]bool)
node.startConsensus = make(chan struct{})
return &node
}
@ -652,7 +661,7 @@ func (node *Node) UpdateStakingList(block *types.Block) error {
for i := range txns {
txn := txns[i]
toAddress := txn.To()
if *toAddress != node.StakingContractAddress { //Not a address aimed at the staking contract.
if toAddress != nil && *toAddress != node.StakingContractAddress { //Not a address aimed at the staking contract.
continue
}
currentSender, _ := types.Sender(signerType, txn)
@ -708,10 +717,33 @@ func decodeFuncSign(data []byte) string {
}
func (node *Node) setupForShardLeader() {
chanPeer := make(chan p2p.Peer)
nodeConfig := service.NodeConfig{
IsBeacon: false,
IsClient: false,
Beacon: p2p.GroupIDBeacon,
Group: p2p.GroupIDUnknown,
Actions: make(map[p2p.GroupID]p2p.ActionType),
}
nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart
var err error
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon)
if err != nil {
utils.GetLogInstance().Error("create group receiver error", "msg", err)
return
}
// Register peer discovery service. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer))
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer))
// Register explorer service.
node.serviceManager.RegisterService(service_manager.SupportExplorer, explorer.New(&node.SelfPeer))
// Register consensus service.
node.serviceManager.RegisterService(service_manager.Consensus, consensus_service.New(node.BlockChannel, node.Consensus))
node.serviceManager.RegisterService(service_manager.Consensus, consensus_service.New(node.BlockChannel, node.Consensus, node.startConsensus))
// Register new block service.
node.serviceManager.RegisterService(service_manager.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReady))
// Register client support service.
@ -721,10 +753,40 @@ func (node *Node) setupForShardLeader() {
}
func (node *Node) setupForShardValidator() {
chanPeer := make(chan p2p.Peer)
nodeConfig := service.NodeConfig{
IsBeacon: false,
IsClient: false,
Beacon: p2p.GroupIDBeacon,
Group: p2p.GroupIDUnknown,
Actions: make(map[p2p.GroupID]p2p.ActionType),
}
nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart
var err error
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon)
if err != nil {
utils.GetLogInstance().Error("create group receiver error", "msg", err)
return
}
// Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer))
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer))
}
func (node *Node) setupForBeaconLeader() {
chanPeer := make(chan p2p.Peer)
nodeConfig := service.NodeConfig{
IsBeacon: true,
IsClient: false,
Beacon: p2p.GroupIDBeacon,
Group: p2p.GroupIDUnknown,
Actions: make(map[p2p.GroupID]p2p.ActionType),
}
nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart
var err error
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon)
@ -733,13 +795,13 @@ func (node *Node) setupForBeaconLeader() {
return
}
// Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, "0", chanPeer, nil))
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, "0", chanPeer))
// Register peer discovery service. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer))
// Register networkinfo service.
node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer))
// Register consensus service.
node.serviceManager.RegisterService(service_manager.Consensus, consensus_service.New(node.BlockChannel, node.Consensus))
node.serviceManager.RegisterService(service_manager.Consensus, consensus_service.New(node.BlockChannel, node.Consensus, node.startConsensus))
// Register new block service.
node.serviceManager.RegisterService(service_manager.BlockProposal, blockproposal.New(node.Consensus.ReadySignal, node.WaitForConsensusReady))
// Register client support service.
@ -751,6 +813,14 @@ func (node *Node) setupForBeaconLeader() {
func (node *Node) setupForBeaconValidator() {
chanPeer := make(chan p2p.Peer)
nodeConfig := service.NodeConfig{
IsBeacon: true,
IsClient: false,
Beacon: p2p.GroupIDBeacon,
Group: p2p.GroupIDUnknown,
Actions: make(map[p2p.GroupID]p2p.ActionType),
}
nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart
var err error
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon)
@ -759,10 +829,10 @@ func (node *Node) setupForBeaconValidator() {
return
}
// Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, "0", chanPeer, nil))
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, "0", chanPeer))
// Register peer discovery service. No need to do staking for beacon chain node.
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer))
// Register networkinfo service.
node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer))
// Register randomness service
node.serviceManager.RegisterService(service_manager.Randomness, randomness_service.New(node.DRand))
}
@ -771,6 +841,15 @@ func (node *Node) setupForNewNode() {
chanPeer := make(chan p2p.Peer)
stakingPeer := make(chan p2p.Peer)
nodeConfig := service.NodeConfig{
IsBeacon: false,
IsClient: false,
Beacon: p2p.GroupIDBeacon,
Group: p2p.GroupIDUnknown,
Actions: make(map[p2p.GroupID]p2p.ActionType),
}
nodeConfig.Actions[p2p.GroupIDBeacon] = p2p.ActionStart
var err error
node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon)
if err != nil {
@ -781,9 +860,9 @@ func (node *Node) setupForNewNode() {
// Register staking service.
node.serviceManager.RegisterService(service_manager.Staking, staking.New(node.AccountKey, 0, stakingPeer))
// Register peer discovery service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, "0", chanPeer, stakingPeer))
node.serviceManager.RegisterService(service_manager.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer))
// Register networkinfo service. "0" is the beacon shard ID
node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, "0", chanPeer))
node.serviceManager.RegisterService(service_manager.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer))
// TODO: how to restart networkinfo and discovery service after receiving shard id info from beacon chain?
}

@ -231,7 +231,7 @@ func (node *Node) BroadcastNewBlock(newBlock *types.Block) {
if node.ClientPeer != nil {
utils.GetLogInstance().Debug("Sending new block to client", "client", node.ClientPeer)
if utils.UseLibP2P {
node.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock}))
node.host.SendMessageToGroups([]p2p.GroupID{node.MyShardGroupID}, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock}))
} else {
node.SendMessage(*node.ClientPeer, proto_node.ConstructBlocksSyncMessage([]*types.Block{newBlock}))
}
@ -261,6 +261,7 @@ func (node *Node) VerifyNewBlock(newBlock *types.Block) bool {
// 2. [leader] send new block to the client
func (node *Node) PostConsensusProcessing(newBlock *types.Block) {
if node.Role == BeaconLeader || node.Role == BeaconValidator {
utils.GetLogInstance().Info("PostConsensusProcessing", "newBlock", newBlock)
node.UpdateStakingList(newBlock)
}
if node.Consensus.IsLeader {
@ -338,7 +339,7 @@ func (node *Node) pingMessageHandler(msgPayload []byte, sender string) int {
// This is the old way of broadcasting pong message
if node.Consensus.IsLeader && !utils.UseLibP2P {
peers := node.Consensus.GetValidatorPeers()
pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys)
pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.Leader.PubKey)
buffer := pong.ConstructPongMessage()
// Send a Pong message directly to the sender
@ -386,19 +387,20 @@ func (node *Node) SendPongMessage() {
} else {
// stable number of peers/pubkeys, sent the pong message
if !sentMessage {
pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys)
pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.Leader.PubKey)
buffer := pong.ConstructPongMessage()
content := host.ConstructP2pMessage(byte(0), buffer)
err := node.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, content)
err := node.host.SendMessageToGroups([]p2p.GroupID{node.MyShardGroupID}, content)
if err != nil {
utils.GetLogInstance().Error("[PONG] failed to send pong message", "group", p2p.GroupIDBeacon)
utils.GetLogInstance().Error("[PONG] failed to send pong message", "group", node.MyShardGroupID)
continue
} else {
utils.GetLogInstance().Info("[PONG] sent pong message to", "group", p2p.GroupIDBeacon)
utils.GetLogInstance().Info("[PONG] sent pong message to", "group", node.MyShardGroupID)
}
sentMessage = true
// stop sending ping message
node.serviceManager.TakeAction(&service.Action{Action: service.Stop, ServiceType: service.PeerDiscovery})
node.startConsensus <- struct{}{}
}
}
numPeers = numPeersNow
@ -436,6 +438,12 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
node.AddPeers(peers)
}
node.Consensus.Leader.PubKey = &bls.PublicKey{}
err = node.Consensus.Leader.PubKey.Deserialize(pong.LeaderPubKey)
if err != nil {
utils.GetLogInstance().Error("Unmarshal Leader PubKey Failed", "error", err)
}
// Reset Validator PublicKeys every time we receive PONG message from Leader
// The PublicKeys has to be idential across the shard on every node
// TODO (lc): we need to handle RemovePeer situation
@ -452,7 +460,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
publicKeys = append(publicKeys, &key)
}
utils.GetLogInstance().Debug("[pongMessageHandler]", "#keys", len(publicKeys), "#peers", len(peers))
// utils.GetLogInstance().Debug("[pongMessageHandler]", "#keys", len(publicKeys), "#peers", len(peers))
if node.State == NodeWaitToJoin {
node.State = NodeReadyForConsensus
@ -463,6 +471,9 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
}
// Stop discovery service after received pong message
node.serviceManager.TakeAction(&service.Action{Action: service.Stop, ServiceType: service.PeerDiscovery})
data := make(map[string]interface{})
data["peer"] = p2p.GroupAction{Name: node.MyShardGroupID, Action: p2p.ActionPause}
node.serviceManager.TakeAction(&service.Action{Action: service.Notify, ServiceType: service.PeerDiscovery, Params: data})
return node.Consensus.UpdatePublicKeys(publicKeys)
}

@ -7,6 +7,7 @@ import (
"testing"
"time"
"github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/drand"
"github.com/ethereum/go-ethereum/common"
@ -148,7 +149,10 @@ func sendPongMessage(node *Node, leader p2p.Peer) {
PubKey: pubKey2,
}
pong1 := proto_discovery.NewPongMessage([]p2p.Peer{p1, p2}, nil)
pubKeys := []*bls.PublicKey{pubKey1, pubKey2}
leaderPubKey := pki.GetBLSPrivateKeyFromInt(888).GetPublicKey()
pong1 := proto_discovery.NewPongMessage([]p2p.Peer{p1, p2}, pubKeys, leaderPubKey)
buf1 := pong1.ConstructPongMessage()
fmt.Println("waiting for 10 seconds ...")

@ -24,10 +24,47 @@ func (id GroupID) String() string {
// Const of group ID
const (
GroupIDBeacon GroupID = "harmony/0.0.1/beacon"
GroupIDGlobal GroupID = "harmony/0.0.1/global"
GroupIDBeacon GroupID = "harmony/0.0.1/beacon"
GroupIDGlobal GroupID = "harmony/0.0.1/global"
GroupIDUnknown GroupID = "B1acKh0lE"
)
// ActionType lists action on group
type ActionType uint
// Const of different Action type
const (
ActionStart ActionType = iota
ActionPause
ActionResume
ActionStop
ActionUnknown
)
func (a ActionType) String() string {
switch a {
case ActionStart:
return "ActionStart"
case ActionPause:
return "ActionPause"
case ActionResume:
return "ActionResume"
case ActionStop:
return "ActionStop"
}
return "ActionUnknown"
}
// GroupAction specify action on corresponding group
type GroupAction struct {
Name GroupID
Action ActionType
}
func (g GroupAction) String() string {
return fmt.Sprintf("%s/%s", g.Name, g.Action)
}
// GroupReceiver is a multicast group message receiver interface.
type GroupReceiver interface {
// Close closes this receiver.

@ -3,7 +3,6 @@ package host
import (
"encoding/binary"
"net"
"runtime"
"time"
"github.com/ethereum/go-ethereum/log"
@ -27,13 +26,13 @@ func BroadcastMessage(h p2p.Host, peers []p2p.Peer, msg []byte, lostPeer chan p2
content := ConstructP2pMessage(byte(17), msg)
length := len(content)
log.Info("Start Broadcasting", "gomaxprocs", runtime.GOMAXPROCS(0), "Size", length)
start := time.Now()
// log.Info("Start Broadcasting", "gomaxprocs", runtime.GOMAXPROCS(0), "Size", length)
// start := time.Now()
for _, peer := range peers {
peerCopy := peer
go send(h, peerCopy, content, lostPeer)
}
log.Info("Broadcasting Done", "time spent(s)", time.Since(start).Seconds())
// log.Info("Broadcasting Done", "time spent(s)", time.Since(start).Seconds())
// Keep track of block propagation time
// Assume 1M message is for block propagation

Loading…
Cancel
Save