diff --git a/api/proto/discovery/pingpong.go b/api/proto/discovery/pingpong.go index fbed312d1..ccebd4fc2 100644 --- a/api/proto/discovery/pingpong.go +++ b/api/proto/discovery/pingpong.go @@ -30,6 +30,7 @@ type PingMessageType struct { // PongMessageType defines the data structure of the Pong message type PongMessageType struct { + ShardID uint32 Version uint16 // version of the protocol Peers []node.Info PubKeys [][]byte // list of publickKeys, has to be identical among all validators/leaders @@ -65,8 +66,9 @@ func NewPingMessage(peer p2p.Peer, isClient bool) *PingMessageType { } // NewPongMessage creates a new Pong message based on a list of p2p.Peer and a list of publicKeys -func NewPongMessage(peers []p2p.Peer, pubKeys []*bls.PublicKey, leaderKey *bls.PublicKey) *PongMessageType { +func NewPongMessage(peers []p2p.Peer, pubKeys []*bls.PublicKey, leaderKey *bls.PublicKey, shardID uint32) *PongMessageType { pong := new(PongMessageType) + pong.ShardID = shardID pong.PubKeys = make([][]byte, 0) pong.Version = proto.ProtocolVersion diff --git a/api/proto/discovery/pingpong_test.go b/api/proto/discovery/pingpong_test.go index a7c42d5a5..0257ab4cf 100644 --- a/api/proto/discovery/pingpong_test.go +++ b/api/proto/discovery/pingpong_test.go @@ -62,7 +62,7 @@ func TestString(test *testing.T) { test.Errorf("expect: %v, got: %v", e3, r3) } - pong1 := NewPongMessage(p2, pubKeys, leaderPubKey) + pong1 := NewPongMessage(p2, pubKeys, leaderPubKey, 0) r2 := fmt.Sprintf("%v", *pong1) if !strings.HasPrefix(r2, e2) { @@ -85,7 +85,7 @@ func TestSerialize(test *testing.T) { test.Error("Serialize/Deserialze Ping Message Failed") } - pong1 := NewPongMessage(p2, pubKeys, leaderPubKey) + pong1 := NewPongMessage(p2, pubKeys, leaderPubKey, 0) buf2 = pong1.ConstructPongMessage() msg2, err := proto.GetMessagePayload(buf2) diff --git a/api/service/config.go b/api/service/config.go index 4b3b3a3b2..84a3fd58a 100644 --- a/api/service/config.go +++ b/api/service/config.go @@ -1,8 +1,6 @@ package service import ( - "strconv" - nodeconfig "github.com/harmony-one/harmony/internal/configs/node" "github.com/harmony-one/harmony/p2p" ) @@ -13,35 +11,35 @@ import ( // cyclic imports type NodeConfig struct { // The three groupID design, please refer to https://github.com/harmony-one/harmony/blob/master/node/node.md#libp2p-integration - Beacon p2p.GroupID // the beacon group ID - Group p2p.GroupID // the group ID of the shard - Client p2p.GroupID // the client group ID of the shard - IsClient bool // whether this node is a client node, such as wallet/txgen - IsBeacon bool // whether this node is a beacon node or not - IsLeader bool // whether this node is a leader or not - ShardID uint32 // shardID of this node - Actions map[p2p.GroupID]p2p.ActionType // actions on the groups + Beacon p2p.GroupID // the beacon group ID + ShardGroupID p2p.GroupID // the group ID of the shard + Client p2p.GroupID // the client group ID of the shard + IsClient bool // whether this node is a client node, such as wallet/txgen + IsBeacon bool // whether this node is a beacon node or not + IsLeader bool // whether this node is a leader or not + ShardID uint32 // shardID of this node + Actions map[p2p.GroupID]p2p.ActionType // actions on the groups } -// GroupIDShards is a map of Group ID +// GroupIDShards is a map of ShardGroupID ID // key is the shard ID // value is the corresponding group ID var ( - GroupIDShards map[p2p.ShardIDType]p2p.GroupID - GroupIDShardClients map[p2p.ShardIDType]p2p.GroupID + GroupIDShards map[p2p.ShardID]p2p.GroupID + GroupIDShardClients map[p2p.ShardID]p2p.GroupID ) func init() { - GroupIDShards = make(map[p2p.ShardIDType]p2p.GroupID) - GroupIDShardClients = make(map[p2p.ShardIDType]p2p.GroupID) + GroupIDShards = make(map[p2p.ShardID]p2p.GroupID) + GroupIDShardClients = make(map[p2p.ShardID]p2p.GroupID) // init beacon chain group IDs - GroupIDShards["0"] = p2p.GroupIDBeacon - GroupIDShardClients["0"] = p2p.GroupIDBeaconClient + GroupIDShards[0] = p2p.GroupIDBeacon + GroupIDShardClients[0] = p2p.GroupIDBeaconClient for i := 1; i < nodeconfig.MaxShards; i++ { - sid := p2p.ShardIDType(strconv.Itoa(i)) - GroupIDShards[sid] = p2p.NewGroupIDShard(sid) - GroupIDShardClients[sid] = p2p.NewGroupIDShardClient(sid) + sid := p2p.ShardID(i) + GroupIDShards[sid] = p2p.NewGroupIDByShardID(sid) + GroupIDShardClients[sid] = p2p.NewClientGroupIDByShardID(sid) } } diff --git a/api/service/discovery/service.go b/api/service/discovery/service.go index bc42253da..e30c28ffe 100644 --- a/api/service/discovery/service.go +++ b/api/service/discovery/service.go @@ -76,7 +76,7 @@ func (s *Service) contactP2pPeers() { pingMsg := proto_discovery.NewPingMessage(s.host.GetSelfPeer(), s.config.IsClient) msgBuf := host.ConstructP2pMessage(byte(0), pingMsg.ConstructPingMessage()) - s.sentPingMessage(p2p.GroupIDBeacon, msgBuf) + s.sentPingMessage(s.config.ShardGroupID, msgBuf) for { select { @@ -123,14 +123,13 @@ func (s *Service) sentPingMessage(g p2p.GroupID, msgBuf []byte) { var err error if g == p2p.GroupIDBeacon || g == p2p.GroupIDBeaconClient { err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Beacon}, msgBuf) - } else { // The following logical will be used for 2nd stage peer discovery process // do nothing when the groupID is unknown - if s.config.Group == p2p.GroupIDUnknown { + if s.config.ShardGroupID == p2p.GroupIDUnknown { return } - err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Group}, msgBuf) + err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.ShardGroupID}, msgBuf) } if err != nil { utils.GetLogInstance().Error("Failed to send ping message", "group", g) diff --git a/api/service/manager_test.go b/api/service/manager_test.go index 851e5df78..346181756 100644 --- a/api/service/manager_test.go +++ b/api/service/manager_test.go @@ -99,8 +99,8 @@ func TestStopServices(t *testing.T) { } func TestInit(t *testing.T) { - if GroupIDShards[p2p.ShardIDType("0")] != p2p.GroupIDBeacon { - t.Errorf("GroupIDShards[0]: %v != GroupIDBeacon: %v", GroupIDShards[p2p.ShardIDType("0")], p2p.GroupIDBeacon) + if GroupIDShards[p2p.ShardID(0)] != p2p.GroupIDBeacon { + t.Errorf("GroupIDShards[0]: %v != GroupIDBeacon: %v", GroupIDShards[p2p.ShardID(0)], p2p.GroupIDBeacon) } if len(GroupIDShards) != nodeconfig.MaxShards { t.Errorf("len(GroupIDShards): %v != TotalShards: %v", len(GroupIDShards), nodeconfig.MaxShards) diff --git a/api/service/networkinfo/service.go b/api/service/networkinfo/service.go index 6c320de29..d88dc117b 100644 --- a/api/service/networkinfo/service.go +++ b/api/service/networkinfo/service.go @@ -89,11 +89,12 @@ func (s *Service) Init() error { // thread that will refresh the peer table every five minutes. utils.GetLogInstance().Debug("Bootstrapping the DHT") if err := s.dht.Bootstrap(s.ctx); err != nil { - return fmt.Errorf("error bootstrap dht") + return fmt.Errorf("error bootstrap dht: %s", err) } var wg sync.WaitGroup if s.bootnodes == nil { + // TODO: should've passed in bootnodes through constructor. s.bootnodes = utils.BootNodes } @@ -123,7 +124,7 @@ func (s *Service) Init() error { } // We use a rendezvous point "shardID" to announce our location. - utils.GetLogInstance().Info("Announcing ourselves...") + utils.GetLogInstance().Info("Announcing ourselves...", "Rendezvous", string(s.Rendezvous)) s.discovery = libp2pdis.NewRoutingDiscovery(s.dht) libp2pdis.Advertise(s.ctx, s.discovery, string(s.Rendezvous)) utils.GetLogInstance().Info("Successfully announced!") @@ -194,7 +195,7 @@ func (s *Service) DoService() { return case <-tick.C: libp2pdis.Advertise(s.ctx, s.discovery, string(s.Rendezvous)) - utils.GetLogInstance().Info("Successfully announced!") + utils.GetLogInstance().Info("Successfully announced!", "Rendezvous", string(s.Rendezvous)) } } } diff --git a/cmd/client/txgen/main.go b/cmd/client/txgen/main.go index e87b3242d..1e5097e45 100644 --- a/cmd/client/txgen/main.go +++ b/cmd/client/txgen/main.go @@ -9,6 +9,8 @@ import ( "sync" "time" + "github.com/harmony-one/harmony/core" + "github.com/harmony-one/harmony/crypto/bls" "github.com/ethereum/go-ethereum/log" @@ -88,12 +90,14 @@ func main() { selfPeer := p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: peerPubKey} // Init with LibP2P enabled, FIXME: (leochen) right now we support only one shard - shardIDs = append(shardIDs, 0) + for i := 0; i < core.GenesisShardNum; i++ { + shardIDs = append(shardIDs, uint32(i)) + } // Do cross shard tx if there are more than one shard setting := txgen.Settings{ NumOfAddress: 10000, - CrossShard: len(shardIDs) > 1, + CrossShard: false, // len(shardIDs) > 1, MaxNumTxsPerBatch: *maxNumTxsPerBatch, CrossShardRatio: *crossShardRatio, } @@ -128,14 +132,14 @@ func main() { // This func is used to update the client's blockchain when new blocks are received from the leaders updateBlocksFunc := func(blocks []*types.Block) { - utils.GetLogInstance().Info("[Txgen] Received new block", "block", blocks) + utils.GetLogInstance().Info("[Txgen] Received new block", "block num", blocks[0].NumberU64()) for _, block := range blocks { for _, node := range nodes { shardID := block.ShardID() if node.Consensus.ShardID == shardID { // Add it to blockchain - utils.GetLogInstance().Info("Current Block", "hash", node.Blockchain().CurrentBlock().Hash().Hex()) + utils.GetLogInstance().Info("Current Block", "block num", node.Blockchain().CurrentBlock().NumberU64()) utils.GetLogInstance().Info("Adding block from leader", "txNum", len(block.Transactions()), "shardID", shardID, "preHash", block.ParentHash().Hex()) node.AddNewBlock(block) stateMutex.Lock() @@ -151,6 +155,7 @@ func main() { clientNode.Client.UpdateBlocks = updateBlocksFunc clientNode.NodeConfig.SetRole(nodeconfig.ClientNode) + clientNode.NodeConfig.SetIsClient(true) clientNode.ServiceManagerSetup() clientNode.RunServices() diff --git a/cmd/harmony/main.go b/cmd/harmony/main.go index 2b86d680d..e230ac482 100644 --- a/cmd/harmony/main.go +++ b/cmd/harmony/main.go @@ -9,6 +9,8 @@ import ( "runtime" "time" + "github.com/harmony-one/harmony/core" + "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" @@ -31,7 +33,7 @@ var ( commit string ) -// InitLDBDatabase initializes a LDBDatabase. isBeacon=true will return the beacon chain database for normal shard nodes +// InitLDBDatabase initializes a LDBDatabase. isGenesis=true will return the beacon chain database for normal shard nodes func InitLDBDatabase(ip string, port string, freshDB bool, isBeacon bool) (*ethdb.LDBDatabase, error) { var dbFileName string if isBeacon { @@ -81,8 +83,8 @@ var ( stakingKeyFile = flag.String("staking_key", "./.stakingkey", "the private key file of the harmony node") // Key file to store the private key keyFile = flag.String("key", "./.hmykey", "the private key file of the harmony node") - // isBeacon indicates this node is a beacon chain node - isBeacon = flag.Bool("is_beacon", false, "true means this node is a beacon chain node") + // isGenesis indicates this node is a genesis node + isGenesis = flag.Bool("is_genesis", false, "true means this node is a genesis node") // isArchival indicates this node is an archival node that will save and archive current blockchain isArchival = flag.Bool("is_archival", false, "true means this node is a archival node") //isNewNode indicates this node is a new node @@ -119,18 +121,24 @@ func initSetup() { func createGlobalConfig() *nodeconfig.ConfigType { var err error - nodeConfig := nodeconfig.GetGlobalConfig() - // Currently we hardcode only one shard. - nodeConfig.ShardID = 0 + nodeConfig := nodeconfig.GetDefaultConfig() + + shardID := uint32(*accountIndex / core.GenesisShardSize) + if !*isNewNode { + nodeConfig = nodeconfig.GetShardConfig(shardID) + } + + // The initial genesis nodes are sequentially put into genesis shards based on their accountIndex + nodeConfig.ShardID = shardID // Key Setup ================= [Start] // Staking private key is the ecdsa key used for token related transaction signing (especially the staking txs). stakingPriKey := "" consensusPriKey := &bls.SecretKey{} - if *isBeacon { - stakingPriKey = contract.InitialBeaconChainAccounts[*accountIndex].Private - err := consensusPriKey.SetHexString(contract.InitialBeaconChainBLSAccounts[*accountIndex].Private) + if *isGenesis { + stakingPriKey = contract.GenesisAccounts[*accountIndex].Private + err := consensusPriKey.SetHexString(contract.GenesisBLSAccounts[*accountIndex].Private) if err != nil { panic(fmt.Errorf("generate key error")) } @@ -159,14 +167,15 @@ func createGlobalConfig() *nodeconfig.ConfigType { if nodeConfig.MainDB, err = InitLDBDatabase(*ip, *port, *freshDB, false); err != nil { panic(err) } - if !*isBeacon { + if !*isGenesis { if nodeConfig.BeaconDB, err = InitLDBDatabase(*ip, *port, *freshDB, true); err != nil { panic(err) } } nodeConfig.SelfPeer = p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: nodeConfig.ConsensusPubKey} - if *isLeader { + + if *accountIndex%core.GenesisShardSize == 0 { // The first node in a shard is the leader at genesis nodeConfig.StringRole = "leader" nodeConfig.Leader = nodeConfig.SelfPeer } else { @@ -202,22 +211,33 @@ func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) (*consensus.Consen // TODO: refactor the creation of blockchain out of node.New() consensus.ChainReader = currentNode.Blockchain() - // TODO: need change config file and use switch instead of complicated "if else" condition - if *isBeacon { - if nodeConfig.StringRole == "leader" { - currentNode.NodeConfig.SetRole(nodeconfig.BeaconLeader) - currentNode.NodeConfig.SetIsLeader(true) + if *isGenesis { + // TODO: need change config file and use switch instead of complicated "if else" condition + if nodeConfig.ShardID == 0 { // Beacon chain + if nodeConfig.StringRole == "leader" { + currentNode.NodeConfig.SetRole(nodeconfig.BeaconLeader) + currentNode.NodeConfig.SetIsLeader(true) + } else { + currentNode.NodeConfig.SetRole(nodeconfig.BeaconValidator) + currentNode.NodeConfig.SetIsLeader(false) + } + currentNode.NodeConfig.SetShardGroupID(p2p.GroupIDBeacon) } else { - currentNode.NodeConfig.SetRole(nodeconfig.BeaconValidator) - currentNode.NodeConfig.SetIsLeader(false) + if nodeConfig.StringRole == "leader" { + currentNode.NodeConfig.SetRole(nodeconfig.ShardLeader) + currentNode.NodeConfig.SetIsLeader(true) + } else { + currentNode.NodeConfig.SetRole(nodeconfig.ShardValidator) + currentNode.NodeConfig.SetIsLeader(false) + } + currentNode.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(nodeConfig.ShardID))) } - currentNode.NodeConfig.SetShardGroupID(p2p.GroupIDBeacon) - currentNode.NodeConfig.SetIsBeacon(true) } else { currentNode.AddBeaconChainDatabase(nodeConfig.BeaconDB) if *isNewNode { currentNode.NodeConfig.SetRole(nodeconfig.NewNode) + // TODO: fix the roles as it's unknown before resharding. } else if nodeConfig.StringRole == "leader" { currentNode.NodeConfig.SetRole(nodeconfig.ShardLeader) currentNode.NodeConfig.SetIsLeader(true) @@ -226,14 +246,13 @@ func setUpConsensusAndNode(nodeConfig *nodeconfig.ConfigType) (*consensus.Consen currentNode.NodeConfig.SetIsLeader(false) } currentNode.NodeConfig.SetShardGroupID(p2p.GroupIDUnknown) - currentNode.NodeConfig.SetIsBeacon(false) } // Add randomness protocol // TODO: enable drand only for beacon chain // TODO: put this in a better place other than main. // TODO(minhdoan): During refactoring, found out that the peers list is actually empty. Need to clean up the logic of drand later. - dRand := drand.New(nodeConfig.Host, nodeConfig.ShardID, []p2p.Peer{}, nodeConfig.Leader, currentNode.ConfirmedBlockChannel, *isLeader, nodeConfig.ConsensusPriKey) + dRand := drand.New(nodeConfig.Host, nodeConfig.ShardID, []p2p.Peer{}, nodeConfig.Leader, currentNode.ConfirmedBlockChannel, nodeConfig.ConsensusPriKey) currentNode.Consensus.RegisterPRndChannel(dRand.PRndChannel) currentNode.Consensus.RegisterRndChannel(dRand.RndChannel) currentNode.DRand = dRand diff --git a/consensus/consensus.go b/consensus/consensus.go index b3c760cea..5d4cca2fd 100644 --- a/consensus/consensus.go +++ b/consensus/consensus.go @@ -461,10 +461,10 @@ func (consensus *Consensus) RemovePeers(peers []p2p.Peer) int { // Or the shard won't be able to reach consensus if public keys are mismatch validators := consensus.GetValidatorPeers() - pong := proto_discovery.NewPongMessage(validators, consensus.PublicKeys, consensus.leader.ConsensusPubKey) + pong := proto_discovery.NewPongMessage(validators, consensus.PublicKeys, consensus.leader.ConsensusPubKey, consensus.ShardID) buffer := pong.ConstructPongMessage() - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), buffer)) + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), buffer)) } return count2 diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 2bf95990b..79c965e60 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -59,7 +59,7 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block, stop // Receive pRnd from DRG protocol utils.GetLogInstance().Debug("[DRG] Waiting for pRnd") pRndAndBitmap := <-consensus.PRndChannel - utils.GetLogInstance().Debug("[DRG] GOT pRnd", "pRnd", pRndAndBitmap) + utils.GetLogInstance().Debug("[DRG] Got pRnd", "pRnd", pRndAndBitmap) pRnd := [32]byte{} copy(pRnd[:], pRndAndBitmap[:32]) bitmap := pRndAndBitmap[32:] @@ -140,7 +140,7 @@ func (consensus *Consensus) startConsensus(newBlock *types.Block) { // Construct broadcast p2p message utils.GetLogInstance().Warn("[Consensus]", "sent announce message", len(msgToSend)) - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) } // processPrepareMessage processes the prepare message sent from validators @@ -213,7 +213,7 @@ func (consensus *Consensus) processPrepareMessage(message *msg_pb.Message) { consensus.aggregatedPrepareSig = aggSig utils.GetLogInstance().Warn("[Consensus]", "sent prepared message", len(msgToSend)) - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) // Set state to targetState consensus.state = targetState @@ -295,7 +295,7 @@ func (consensus *Consensus) processCommitMessage(message *msg_pb.Message) { consensus.aggregatedCommitSig = aggSig utils.GetLogInstance().Warn("[Consensus]", "sent committed message", len(msgToSend)) - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) var blockObj types.Block err := rlp.DecodeBytes(consensus.block, &blockObj) diff --git a/consensus/consensus_validator.go b/consensus/consensus_validator.go index 8ba9d4049..3763b9f34 100644 --- a/consensus/consensus_validator.go +++ b/consensus/consensus_validator.go @@ -95,7 +95,7 @@ func (consensus *Consensus) processAnnounceMessage(message *msg_pb.Message) { // Construct and send prepare message msgToSend := consensus.constructPrepareMessage() utils.GetLogInstance().Warn("[Consensus]", "sent prepare message", len(msgToSend)) - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.state = PrepareDone } @@ -165,7 +165,7 @@ func (consensus *Consensus) processPreparedMessage(message *msg_pb.Message) { multiSigAndBitmap := append(multiSig, bitmap...) msgToSend := consensus.constructCommitMessage(multiSigAndBitmap) utils.GetLogInstance().Warn("[Consensus]", "sent commit message", len(msgToSend)) - consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) + consensus.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(consensus.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) consensus.state = CommitDone } diff --git a/core/resharding.go b/core/resharding.go index 6cd9dd8da..baab65e5c 100644 --- a/core/resharding.go +++ b/core/resharding.go @@ -24,7 +24,7 @@ const ( // GenesisShardNum is the number of shard at genesis GenesisShardNum = 4 // GenesisShardSize is the size of each shard at genesis - GenesisShardSize = 50 + GenesisShardSize = 10 // CuckooRate is the percentage of nodes getting reshuffled in the second step of cuckoo resharding. CuckooRate = 0.1 ) @@ -198,24 +198,22 @@ func (ss *ShardingState) UpdateShardingState(stakeInfo *map[common.Address]*stru } // GetInitShardState returns the initial shard state at genesis. -// TODO: make the deploy.sh config file in sync with genesis constants. func GetInitShardState() types.ShardState { shardState := types.ShardState{} for i := 0; i < GenesisShardNum; i++ { com := types.Committee{ShardID: uint32(i)} - if i == 0 { - for j := 0; j < GenesisShardSize; j++ { - priKey := bls.SecretKey{} - priKey.SetHexString(contract.InitialBeaconChainBLSAccounts[j].Private) - addrBytes := priKey.GetPublicKey().GetAddress() - blsAddr := common.BytesToAddress(addrBytes[:]).Hex() - // TODO: directly read address for bls too - curNodeID := types.NodeID{contract.InitialBeaconChainAccounts[j].Address, blsAddr} - if j == 0 { - com.Leader = curNodeID - } - com.NodeList = append(com.NodeList, curNodeID) + for j := 0; j < GenesisShardSize; j++ { + index := i*GenesisShardNum + j // The initial account to use for genesis nodes + priKey := bls.SecretKey{} + priKey.SetHexString(contract.GenesisBLSAccounts[index].Private) + addrBytes := priKey.GetPublicKey().GetAddress() + blsAddr := common.BytesToAddress(addrBytes[:]).Hex() + // TODO: directly read address for bls too + curNodeID := types.NodeID{contract.GenesisAccounts[index].Address, blsAddr} + if j == 0 { + com.Leader = curNodeID } + com.NodeList = append(com.NodeList, curNodeID) } shardState = append(shardState, com) } diff --git a/drand/drand.go b/drand/drand.go index cb9ad9223..f30f0e847 100644 --- a/drand/drand.go +++ b/drand/drand.go @@ -65,7 +65,7 @@ type DRand struct { } // New creates a new dRand object -func New(host p2p.Host, ShardID uint32, peers []p2p.Peer, leader p2p.Peer, confirmedBlockChannel chan *types.Block, isLeader bool, blsPriKey *bls.SecretKey) *DRand { +func New(host p2p.Host, ShardID uint32, peers []p2p.Peer, leader p2p.Peer, confirmedBlockChannel chan *types.Block, blsPriKey *bls.SecretKey) *DRand { dRand := DRand{} dRand.host = host @@ -77,7 +77,11 @@ func New(host p2p.Host, ShardID uint32, peers []p2p.Peer, leader p2p.Peer, confi dRand.RndChannel = make(chan [64]byte) selfPeer := host.GetSelfPeer() - dRand.IsLeader = isLeader + if leader.Port == selfPeer.Port && leader.IP == selfPeer.IP { + dRand.IsLeader = true + } else { + dRand.IsLeader = false + } dRand.leader = leader for _, peer := range peers { diff --git a/drand/drand_leader.go b/drand/drand_leader.go index 218aee8dc..765898c39 100644 --- a/drand/drand_leader.go +++ b/drand/drand_leader.go @@ -78,7 +78,7 @@ func (dRand *DRand) init(epochBlock *types.Block) { (*dRand.vrfs)[dRand.SelfAddress] = append(rand[:], proof...) utils.GetLogInstance().Info("[DRG] sent init", "msg", msgToSend, "leader.PubKey", dRand.leader.ConsensusPubKey) - dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) + dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(dRand.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) } // ProcessMessageLeader dispatches messages for the leader to corresponding processors. @@ -100,6 +100,7 @@ func (dRand *DRand) ProcessMessageLeader(payload []byte) { // ProcessMessageValidator dispatches validator's consensus message. func (dRand *DRand) processCommitMessage(message *msg_pb.Message) { + utils.GetLogInstance().Error("[DRG] Leader received commit") if message.Type != msg_pb.MessageType_DRAND_COMMIT { utils.GetLogInstance().Error("Wrong message type received", "expected", msg_pb.MessageType_DRAND_COMMIT, "got", message.Type) return diff --git a/drand/drand_leader_msg_test.go b/drand/drand_leader_msg_test.go index d6f643cc4..1a3e8ab6c 100644 --- a/drand/drand_leader_msg_test.go +++ b/drand/drand_leader_msg_test.go @@ -21,7 +21,7 @@ func TestConstructInitMessage(test *testing.T) { if err != nil { test.Fatalf("newhost failure: %v", err) } - dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, true, bls.RandPrivateKey()) + dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, bls.RandPrivateKey()) dRand.blockHash = [32]byte{} msg := dRand.constructInitMessage() @@ -43,7 +43,7 @@ func TestProcessCommitMessage(test *testing.T) { if err != nil { test.Fatalf("newhost failure: %v", err) } - dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, true, bls.RandPrivateKey()) + dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, bls.RandPrivateKey()) dRand.blockHash = [32]byte{} msg := dRand.constructCommitMessage([32]byte{}, []byte{}) diff --git a/drand/drand_test.go b/drand/drand_test.go index edd6df606..2071ade43 100644 --- a/drand/drand_test.go +++ b/drand/drand_test.go @@ -24,7 +24,7 @@ func TestNew(test *testing.T) { if err != nil { test.Fatalf("newhost failure: %v", err) } - dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, true, bls2.RandPrivateKey()) + dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, bls2.RandPrivateKey()) if !dRand.IsLeader { test.Error("dRand should belong to a leader") @@ -39,7 +39,7 @@ func TestGetValidatorPeers(test *testing.T) { if err != nil { test.Fatalf("newhost failure: %v", err) } - dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, true, bls2.RandPrivateKey()) + dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, bls2.RandPrivateKey()) if !dRand.IsLeader { test.Error("dRand should belong to a leader") @@ -60,7 +60,7 @@ func TestAddPeers(test *testing.T) { if err != nil { test.Fatalf("newhost failure: %v", err) } - dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, true, bls2.RandPrivateKey()) + dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, bls2.RandPrivateKey()) if !dRand.IsLeader { test.Error("dRand should belong to a leader") @@ -89,7 +89,7 @@ func TestGetValidatorByPeerId(test *testing.T) { if err != nil { test.Fatalf("newhost failure: %v", err) } - dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, true, leaderPriKey) + dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, leaderPriKey) if !dRand.IsLeader { test.Error("dRand should belong to a leader") @@ -114,7 +114,7 @@ func TestResetState(test *testing.T) { if err != nil { test.Fatalf("newhost failure: %v", err) } - dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, true, bls2.RandPrivateKey()) + dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, bls2.RandPrivateKey()) dRand.ResetState() } @@ -126,7 +126,7 @@ func TestSetLeaderPubKey(test *testing.T) { if err != nil { test.Fatalf("newhost failure: %v", err) } - dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, true, bls2.RandPrivateKey()) + dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, bls2.RandPrivateKey()) _, newPublicKey, _ := utils.GenKeyP2P("127.0.0.1", "9902") newPublicKeyBytes, _ := newPublicKey.Bytes() @@ -143,7 +143,7 @@ func TestUpdatePublicKeys(test *testing.T) { if err != nil { test.Fatalf("newhost failure: %v", err) } - dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, true, bls2.RandPrivateKey()) + dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, bls2.RandPrivateKey()) pubKey1 := bls2.RandPrivateKey().GetPublicKey() pubKey2 := bls2.RandPrivateKey().GetPublicKey() @@ -169,7 +169,7 @@ func TestVerifyMessageSig(test *testing.T) { if err != nil { test.Fatalf("newhost failure: %v", err) } - dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, true, bls2.RandPrivateKey()) + dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, bls2.RandPrivateKey()) message := &msg_pb.Message{ ReceiverType: msg_pb.ReceiverType_VALIDATOR, @@ -198,7 +198,7 @@ func TestVrf(test *testing.T) { if err != nil { test.Fatalf("newhost failure: %v", err) } - dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, true, bls2.RandPrivateKey()) + dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, bls2.RandPrivateKey()) tx1 := types.NewTransaction(1, common.BytesToAddress([]byte{0x11}), 0, big.NewInt(111), 1111, big.NewInt(11111), []byte{0x11, 0x11, 0x11}) txs := []*types.Transaction{tx1} diff --git a/drand/drand_validator.go b/drand/drand_validator.go index 341b2e7d0..52cb68899 100644 --- a/drand/drand_validator.go +++ b/drand/drand_validator.go @@ -53,5 +53,5 @@ func (dRand *DRand) processInitMessage(message *msg_pb.Message) { msgToSend := dRand.constructCommitMessage(rand, proof) // Send the commit message back to leader - dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, host.ConstructP2pMessage(byte(17), msgToSend)) + dRand.host.SendMessageToGroups([]p2p.GroupID{p2p.NewGroupIDByShardID(p2p.ShardID(dRand.ShardID))}, host.ConstructP2pMessage(byte(17), msgToSend)) } diff --git a/drand/drand_validator_msg_test.go b/drand/drand_validator_msg_test.go index bc8e94baf..1d7a25720 100644 --- a/drand/drand_validator_msg_test.go +++ b/drand/drand_validator_msg_test.go @@ -21,7 +21,7 @@ func TestConstructCommitMessage(test *testing.T) { if err != nil { test.Fatalf("newhost failure: %v", err) } - dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, true, bls.RandPrivateKey()) + dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, bls.RandPrivateKey()) dRand.blockHash = [32]byte{} msg := dRand.constructCommitMessage([32]byte{}, []byte{}) msgPayload, _ := proto.GetDRandMessagePayload(msg) @@ -42,7 +42,7 @@ func TestProcessInitMessage(test *testing.T) { if err != nil { test.Fatalf("newhost failure: %v", err) } - dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, true, bls.RandPrivateKey()) + dRand := New(host, 0, []p2p.Peer{leader, validator}, leader, nil, bls.RandPrivateKey()) dRand.blockHash = [32]byte{} msg := dRand.constructInitMessage() diff --git a/internal/configs/node/config.go b/internal/configs/node/config.go index 83731c2e9..ddcba4153 100644 --- a/internal/configs/node/config.go +++ b/internal/configs/node/config.go @@ -5,6 +5,7 @@ package nodeconfig import ( "crypto/ecdsa" + "errors" "fmt" "sync" @@ -61,10 +62,9 @@ const ( type ConfigType struct { // The three groupID design, please refer to https://github.com/harmony-one/harmony/blob/master/node/node.md#libp2p-integration beacon p2p.GroupID // the beacon group ID - group p2p.GroupID // the group ID of the shard + group p2p.GroupID // the group ID of the shard (note: for beacon chain node, the beacon and shard group are the same) client p2p.GroupID // the client group ID of the shard isClient bool // whether this node is a client node, such as wallet/txgen - isBeacon bool // whether this node is a beacon node or not isLeader bool // whether this node is a leader or not ShardID uint32 // ShardID of this node role Role // Role of the node @@ -85,27 +85,40 @@ type ConfigType struct { // configs is a list of node configuration. // It has at least one configuration. // The first one is the default, global node configuration -var configs []ConfigType +var shardConfigs []ConfigType +var defaultConfig ConfigType var onceForConfigs sync.Once -// GetConfigs return the indexed ConfigType variable -func GetConfigs(index int) *ConfigType { +// GetShardConfig return the shard's ConfigType variable +func GetShardConfig(shardID uint32) *ConfigType { onceForConfigs.Do(func() { - configs = make([]ConfigType, MaxShards) + shardConfigs = make([]ConfigType, MaxShards) }) - if index > cap(configs) { + if int(shardID) >= cap(shardConfigs) { return nil } - return &configs[index] + return &shardConfigs[shardID] } -// GetGlobalConfig returns global config. -func GetGlobalConfig() *ConfigType { - return GetConfigs(Global) +// SetConfigs set ConfigType in the right index. +func SetConfigs(config ConfigType, shardID uint32) error { + onceForConfigs.Do(func() { + shardConfigs = make([]ConfigType, MaxShards) + }) + if int(shardID) >= cap(shardConfigs) { + return errors.New("Failed to set ConfigType") + } + shardConfigs[int(shardID)] = config + return nil +} + +// GetDefaultConfig returns default config. +func GetDefaultConfig() *ConfigType { + return &defaultConfig } func (conf *ConfigType) String() string { - return fmt.Sprintf("%s/%s/%s:%v,%v,%v:%v", conf.beacon, conf.group, conf.client, conf.isClient, conf.isBeacon, conf.isLeader, conf.ShardID) + return fmt.Sprintf("%s/%s/%s:%v,%v,%v:%v", conf.beacon, conf.group, conf.client, conf.isClient, conf.IsBeacon(), conf.isLeader, conf.ShardID) } // SetBeaconGroupID set the groupID for beacon group @@ -128,11 +141,6 @@ func (conf *ConfigType) SetIsClient(b bool) { conf.isClient = b } -// SetIsBeacon set the isBeacon configuration -func (conf *ConfigType) SetIsBeacon(b bool) { - conf.isBeacon = b -} - // SetIsLeader set the isLeader configuration func (conf *ConfigType) SetIsLeader(b bool) { conf.isLeader = b @@ -170,7 +178,7 @@ func (conf *ConfigType) IsClient() bool { // IsBeacon returns the isBeacon configuration func (conf *ConfigType) IsBeacon() bool { - return conf.isBeacon + return conf.ShardID == 0 } // IsLeader returns the isLeader configuration diff --git a/internal/configs/node/config_test.go b/internal/configs/node/config_test.go index b4460a910..f8c03be02 100644 --- a/internal/configs/node/config_test.go +++ b/internal/configs/node/config_test.go @@ -8,10 +8,10 @@ import ( func TestNodeConfigSingleton(t *testing.T) { // init 3 configs - _ = GetConfigs(2) + _ = GetShardConfig(2) // get the singleton variable - c := GetConfigs(Global) + c := GetShardConfig(Global) c.SetIsLeader(true) @@ -21,7 +21,7 @@ func TestNodeConfigSingleton(t *testing.T) { c.SetBeaconGroupID(p2p.GroupIDBeacon) - d := GetConfigs(Global) + d := GetShardConfig(Global) if !d.IsLeader() { t.Errorf("IsLeader = %v, expected = %v", d.IsLeader(), true) @@ -36,16 +36,15 @@ func TestNodeConfigSingleton(t *testing.T) { func TestNodeConfigMultiple(t *testing.T) { // init 3 configs - c := GetConfigs(2) - d := GetConfigs(1) - e := GetConfigs(0) - f := GetConfigs(42) + c := GetShardConfig(2) + d := GetShardConfig(1) + e := GetShardConfig(0) + f := GetShardConfig(42) if f != nil { t.Errorf("expecting nil, got: %v", f) } - c.SetIsBeacon(true) if c.IsBeacon() != true { t.Errorf("expecting true, got: %v", c.IsBeacon()) } diff --git a/internal/utils/contract/constants.go b/internal/utils/contract/constants.go index 685f3c249..9f3b036af 100644 --- a/internal/utils/contract/constants.go +++ b/internal/utils/contract/constants.go @@ -37,8 +37,8 @@ var GenesisBeaconAccountPublicKey = GenesisBeaconAccountPriKey.PublicKey // DeployedContractAddress is the deployed contract address of the staking smart contract in beacon chain. var DeployedContractAddress = crypto.CreateAddress(crypto.PubkeyToAddress(GenesisBeaconAccountPublicKey), uint64(0)) -// InitialBeaconChainAccounts are the ECSDA accounts for the initial beacon chain nodes. -var InitialBeaconChainAccounts = [...]DeployAccount{ +// GenesisAccounts are the ECSDA accounts for the initial genesis nodes. +var GenesisAccounts = [...]DeployAccount{ {Address: "0xE2bD4413172C98d5094B94de1A8AC6a383d68b84", Private: "e401343197a852f361e38ce6b46c99f1d6d1f80499864c6ae7effee42b46ab6b", Public: "0xE2bD4413172C98d5094B94de1A8AC6a383d68b84"}, {Address: "0x183418934Fd8A97c98E086151317B2df6259b8A8", Private: "a7d764439a7619f703c97ee2a2cf0be2cd62ad4c9deebd5423d6f28de417b907", Public: "0x183418934Fd8A97c98E086151317B2df6259b8A8"}, {Address: "0x9df0e70D4cb3E9beC0548D8Ac56F46596D1BcdB6", Private: "4e3f7c819a15249d2824834cd7ce20fe24d6eab8eb39ac63b78cb3713362cf78", Public: "0x9df0e70D4cb3E9beC0548D8Ac56F46596D1BcdB6"}, @@ -451,8 +451,8 @@ var InitialBeaconChainAccounts = [...]DeployAccount{ {Address: "0x193Cd8BB827F0D5Cd704B7bb9b44AAB3E5448260", Private: "58cc6ae8ab7abcb22f6f40a078e452a50baaa889c95d776b7c83d600aaa00a60", Public: "0x193Cd8BB827F0D5Cd704B7bb9b44AAB3E5448260"}, } -// InitialBeaconChainBLSAccounts are the BLS accounts for the initial beacon chain nodes. -var InitialBeaconChainBLSAccounts = [...]DeployAccount{ +// GenesisBLSAccounts are the BLS accounts for the initial genesis nodes. +var GenesisBLSAccounts = [...]DeployAccount{ {Address: "", Private: "66acb3a7c990be4b06709058fdef8122b7ecdbaf023e56ccf8cdf671c5333646", Public: ""}, {Address: "", Private: "5e9e2fffbf7cfad085d7b0147d2acd680cfd8b8d62daa9c39370185ba0207920", Public: ""}, {Address: "", Private: "56714bb94188c335d1243fa3d17fd50ff63a1a9bf740faecd97996f3a0737e87", Public: ""}, diff --git a/node/node.go b/node/node.go index 40c836e91..6626d7e33 100644 --- a/node/node.go +++ b/node/node.go @@ -149,8 +149,11 @@ type Node struct { ContractDeployerKey *ecdsa.PrivateKey ContractAddresses []common.Address - // Group Message Receiver - groupReceiver p2p.GroupReceiver + // Shard group Message Receiver + shardGroupReceiver p2p.GroupReceiver + + // Global group Message Receiver + globalGroupReceiver p2p.GroupReceiver // Client Message Receiver to handle light client messages // Beacon leader needs to use this receiver to talk to new node @@ -281,10 +284,11 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database, is if consensusObj != nil && consensusObj.IsLeader { node.State = NodeLeader - go node.ReceiveClientGroupMessage() } else { node.State = NodeInit + } + go node.ReceiveClientGroupMessage() // Setup initial state of syncing. node.peerRegistrationRecord = make(map[string]*syncConfig) @@ -297,8 +301,12 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database, is node.startConsensus = make(chan struct{}) - // init the global and the only node config - node.NodeConfig = nodeconfig.GetConfigs(nodeconfig.Global) + // Get the node config that's created in the harmony.go program. + if consensusObj != nil { + node.NodeConfig = nodeconfig.GetShardConfig(consensusObj.ShardID) + } else { + node.NodeConfig = nodeconfig.GetDefaultConfig() + } return &node } @@ -349,27 +357,41 @@ func (node *Node) RemovePeersHandler() { // isBeacon = true if the node is beacon node // isClient = true if the node light client(txgen,wallet) -func (node *Node) initNodeConfiguration(isBeacon bool, isClient bool) (service.NodeConfig, chan p2p.Peer) { +func (node *Node) initNodeConfiguration() (service.NodeConfig, chan p2p.Peer) { chanPeer := make(chan p2p.Peer) nodeConfig := service.NodeConfig{ - IsBeacon: isBeacon, - IsClient: isClient, - Beacon: p2p.GroupIDBeacon, - Group: p2p.GroupIDUnknown, - Actions: make(map[p2p.GroupID]p2p.ActionType), + IsBeacon: node.NodeConfig.IsBeacon(), + IsClient: node.NodeConfig.IsClient(), + Beacon: p2p.GroupIDBeacon, + ShardGroupID: node.NodeConfig.GetShardGroupID(), + Actions: make(map[p2p.GroupID]p2p.ActionType), } - nodeConfig.Actions[p2p.GroupIDBeaconClient] = p2p.ActionStart - var err error - if isBeacon { - node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeacon) - node.clientReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeaconClient) - node.NodeConfig.SetClientGroupID(p2p.GroupIDBeaconClient) + if nodeConfig.IsClient { + nodeConfig.Actions[p2p.GroupIDBeaconClient] = p2p.ActionStart } else { - node.groupReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeaconClient) + nodeConfig.Actions[node.NodeConfig.GetShardGroupID()] = p2p.ActionStart + } + + var err error + node.shardGroupReceiver, err = node.host.GroupReceiver(node.NodeConfig.GetShardGroupID()) + if err != nil { + utils.GetLogInstance().Error("Failed to create shard receiver", "msg", err) + } + + node.globalGroupReceiver, err = node.host.GroupReceiver(p2p.GroupIDGlobal) + if err != nil { + utils.GetLogInstance().Error("Failed to create global receiver", "msg", err) } + node.clientReceiver, err = node.host.GroupReceiver(p2p.GroupIDBeaconClient) + if err != nil { + utils.GetLogInstance().Error("Failed to create beacon client receiver", "msg", err) + } + + node.NodeConfig.SetClientGroupID(p2p.GroupIDBeaconClient) + if err != nil { utils.GetLogInstance().Error("create group receiver error", "msg", err) } diff --git a/node/node_genesis.go b/node/node_genesis.go index 62c8df565..1c5a54c11 100644 --- a/node/node_genesis.go +++ b/node/node_genesis.go @@ -86,7 +86,7 @@ func (node *Node) CreateGenesisAllocWithTestingAddresses(numAddress int) core.Ge // AddNodeAddressesToGenesisAlloc adds to the genesis block allocation the accounts used for network validators/nodes, // including the account used by the nodes of the initial beacon chain and later new nodes. func AddNodeAddressesToGenesisAlloc(genesisAlloc core.GenesisAlloc) { - for _, account := range contract.InitialBeaconChainAccounts { + for _, account := range contract.GenesisAccounts { testBankFunds := big.NewInt(InitFreeFundInEther) testBankFunds = testBankFunds.Mul(testBankFunds, big.NewInt(params.Ether)) address := common.HexToAddress(account.Address) diff --git a/node/node_handler.go b/node/node_handler.go index ee835b976..b8d11158c 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -36,11 +36,11 @@ const ( func (node *Node) ReceiveGroupMessage() { ctx := context.Background() for { - if node.groupReceiver == nil { + if node.shardGroupReceiver == nil { time.Sleep(100 * time.Millisecond) continue } - msg, sender, err := node.groupReceiver.Receive(ctx) + msg, sender, err := node.shardGroupReceiver.Receive(ctx) if sender != node.host.GetID() { // utils.GetLogInstance().Info("[PUBSUB]", "received group msg", len(msg), "sender", sender) if err == nil { @@ -57,12 +57,12 @@ func (node *Node) ReceiveClientGroupMessage() { for { if node.clientReceiver == nil { // check less frequent on client messages - time.Sleep(1000 * time.Millisecond) + time.Sleep(100 * time.Millisecond) continue } msg, sender, err := node.clientReceiver.Receive(ctx) if sender != node.host.GetID() { - utils.GetLogInstance().Info("[CLIENT]", "received group msg", len(msg), "sender", sender) + utils.GetLogInstance().Info("[CLIENT]", "received group msg", len(msg), "sender", sender, "error", err) if err == nil { // skip the first 5 bytes, 1 byte is p2p type, 4 bytes are message size go node.messageHandler(msg[5:], string(sender)) @@ -110,10 +110,10 @@ func (node *Node) messageHandler(content []byte, sender string) { msgPayload, _ := proto.GetDRandMessagePayload(content) if node.DRand != nil { if node.DRand.IsLeader { - // utils.GetLogInstance().Info("NET: DRand Leader received message") + utils.GetLogInstance().Info("NET: DRand Leader received message") node.DRand.ProcessMessageLeader(msgPayload) } else { - // utils.GetLogInstance().Info("NET: DRand Validator received message") + utils.GetLogInstance().Info("NET: DRand Validator received message") node.DRand.ProcessMessageValidator(msgPayload) } } @@ -424,7 +424,7 @@ func (node *Node) SendPongMessage() { // stable number of peers/pubkeys, sent the pong message // also make sure number of peers is greater than the minimal required number if !sentMessage && numPubKeysNow >= node.Consensus.MinPeers { - pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.GetLeaderPubKey()) + pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.GetLeaderPubKey(), node.Consensus.ShardID) buffer := pong.ConstructPongMessage() err := node.host.SendMessageToGroups([]p2p.GroupID{node.NodeConfig.GetShardGroupID()}, host.ConstructP2pMessage(byte(0), buffer)) if err != nil { @@ -452,7 +452,7 @@ func (node *Node) SendPongMessage() { // send pong message regularly to make sure new node received all the public keys // also nodes offline/online will receive the public keys peers := node.Consensus.GetValidatorPeers() - pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.GetLeaderPubKey()) + pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.GetLeaderPubKey(), node.Consensus.ShardID) buffer := pong.ConstructPongMessage() err := node.host.SendMessageToGroups([]p2p.GroupID{node.NodeConfig.GetShardGroupID()}, host.ConstructP2pMessage(byte(0), buffer)) if err != nil { @@ -472,6 +472,11 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { return -1 } + if pong.ShardID != node.Consensus.ShardID { + utils.GetLogInstance().Error("Received Pong message for the wrong shard", "receivedShardID", pong.ShardID) + return 0 + } + // set the leader pub key is the first thing to do // otherwise, we may not be able to validate the consensus messages received // which will result in first consensus timeout @@ -485,7 +490,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { if err != nil { utils.GetLogInstance().Error("Unmarshal DRand Leader PubKey Failed", "error", err) } else { - utils.GetLogInstance().Info("Set DRand Leader PubKey") + utils.GetLogInstance().Info("Set DRand Leader PubKey", "key", utils.GetAddressHex(node.Consensus.GetLeaderPubKey())) } peers := make([]*p2p.Peer, 0) diff --git a/node/node_newblock.go b/node/node_newblock.go index a18f70297..1939d32ca 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -21,7 +21,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}, stopChan chan defer close(stoppedChan) utils.GetLogInstance().Debug("Waiting for Consensus ready") - time.Sleep(15 * time.Second) // Wait for other nodes to be ready (test-only) + time.Sleep(30 * time.Second) // Wait for other nodes to be ready (test-only) firstTime := true var newBlock *types.Block diff --git a/node/node_test.go b/node/node_test.go index 326213b46..fb8f03f2b 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -94,7 +94,7 @@ func TestAddPeers(t *testing.T) { t.Fatalf("newhost failure: %v", err) } consensus := consensus.New(host, 0, []p2p.Peer{leader, validator}, leader, nil) - dRand := drand.New(host, 0, []p2p.Peer{leader, validator}, leader, nil, true, nil) + dRand := drand.New(host, 0, []p2p.Peer{leader, validator}, leader, nil, nil) node := New(host, consensus, nil, false) node.DRand = dRand @@ -137,7 +137,7 @@ func TestAddBeaconPeer(t *testing.T) { t.Fatalf("newhost failure: %v", err) } consensus := consensus.New(host, 0, []p2p.Peer{leader, validator}, leader, nil) - dRand := drand.New(host, 0, []p2p.Peer{leader, validator}, leader, nil, true, nil) + dRand := drand.New(host, 0, []p2p.Peer{leader, validator}, leader, nil, nil) node := New(host, consensus, nil, false) node.DRand = dRand @@ -187,7 +187,7 @@ func sendPongMessage(node *Node, leader p2p.Peer) { pubKeys := []*bls.PublicKey{pubKey1, pubKey2} leaderPubKey := pki.GetBLSPrivateKeyFromInt(888).GetPublicKey() - pong1 := proto_discovery.NewPongMessage([]p2p.Peer{p1, p2}, pubKeys, leaderPubKey) + pong1 := proto_discovery.NewPongMessage([]p2p.Peer{p1, p2}, pubKeys, leaderPubKey, 0) _ = pong1.ConstructPongMessage() } diff --git a/node/service_setup.go b/node/service_setup.go index 1b54d223c..4fd8fd033 100644 --- a/node/service_setup.go +++ b/node/service_setup.go @@ -18,12 +18,12 @@ import ( ) func (node *Node) setupForShardLeader() { - nodeConfig, chanPeer := node.initNodeConfiguration(false, false) + nodeConfig, chanPeer := node.initNodeConfiguration() // Register peer discovery service. No need to do staking for beacon chain node. node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer)) // Register networkinfo service. "0" is the beacon shard ID - node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil)) + node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil)) // Register explorer service. node.serviceManager.RegisterService(service.SupportExplorer, explorer.New(&node.SelfPeer, node.Consensus.GetNumPeers)) @@ -38,23 +38,24 @@ func (node *Node) setupForShardLeader() { } func (node *Node) setupForShardValidator() { - nodeConfig, chanPeer := node.initNodeConfiguration(false, false) + nodeConfig, chanPeer := node.initNodeConfiguration() // Register client support service. node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) // Register peer discovery service. "0" is the beacon shard ID. No need to do staking for beacon chain node. node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer)) // Register networkinfo service. "0" is the beacon shard ID - node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil)) + node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil)) } func (node *Node) setupForBeaconLeader() { - nodeConfig, chanPeer := node.initNodeConfiguration(true, false) + nodeConfig, chanPeer := node.initNodeConfiguration() // Register peer discovery service. No need to do staking for beacon chain node. node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil)) // Register networkinfo service. - node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil)) + node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil)) + // Register consensus service. node.serviceManager.RegisterService(service.Consensus, consensus.New(node.BlockChannel, node.Consensus, node.startConsensus)) // Register new block service. @@ -73,32 +74,32 @@ func (node *Node) setupForBeaconLeader() { } func (node *Node) setupForBeaconValidator() { - nodeConfig, chanPeer := node.initNodeConfiguration(true, false) + nodeConfig, chanPeer := node.initNodeConfiguration() // Register client support service. node.serviceManager.RegisterService(service.ClientSupport, clientsupport.New(node.blockchain.State, node.CallFaucetContract, node.getDeployedStakingContract, node.SelfPeer.IP, node.SelfPeer.Port)) // Register peer discovery service. No need to do staking for beacon chain node. node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, nil)) // Register networkinfo service. - node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil)) + node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetShardGroupID(), chanPeer, nil)) } func (node *Node) setupForNewNode() { // TODO determine the role of new node, currently assume it is beacon node - nodeConfig, chanPeer := node.initNodeConfiguration(true, true) + nodeConfig, chanPeer := node.initNodeConfiguration() // Register staking service. node.serviceManager.RegisterService(service.Staking, staking.New(node.host, node.AccountKey, node.beaconChain, node.NodeConfig.ConsensusPubKey.GetAddress())) // Register peer discovery service. "0" is the beacon shard ID node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer)) // Register networkinfo service. "0" is the beacon shard ID - node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil)) + node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, node.NodeConfig.GetBeaconGroupID(), chanPeer, nil)) // TODO: how to restart networkinfo and discovery service after receiving shard id info from beacon chain? } func (node *Node) setupForClientNode() { - nodeConfig, chanPeer := node.initNodeConfiguration(false, true) + nodeConfig, chanPeer := node.initNodeConfiguration() // Register peer discovery service. node.serviceManager.RegisterService(service.PeerDiscovery, discovery.New(node.host, nodeConfig, chanPeer, node.AddBeaconPeer)) diff --git a/p2p/group.go b/p2p/group.go index ed0bb513a..0dd6cde6e 100644 --- a/p2p/group.go +++ b/p2p/group.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "strconv" libp2p_peer "github.com/libp2p/go-libp2p-peer" ) @@ -19,28 +20,37 @@ import ( type GroupID string func (id GroupID) String() string { - return fmt.Sprintf("%x", string(id)) + return fmt.Sprintf("%s", string(id)) } // Const of group ID const ( - GroupIDBeacon GroupID = "harmony/0.0.1/beacon" - GroupIDBeaconClient GroupID = "harmony/0.0.1/beacon/client" - GroupIDGlobal GroupID = "harmony/0.0.1/global" - GroupIDUnknown GroupID = "B1acKh0lE" + GroupIDBeacon GroupID = "harmony/0.0.1/node/beacon" + GroupIDBeaconClient GroupID = "harmony/0.0.1/client/beacon" + GroupIDShardPrefix GroupID = "harmony/0.0.1/node/shard/%s" + GroupIDShardClientPrefix GroupID = "harmony/0.0.1/client/shard/%s" + GroupIDGlobal GroupID = "harmony/0.0.1/node/global" + GroupIDGlobalClient GroupID = "harmony/0.0.1/node/global" + GroupIDUnknown GroupID = "B1acKh0lE" ) -// ShardIDType defines the data type of a shard ID -type ShardIDType string +// ShardID defines the ID of a shard +type ShardID uint32 -// NewGroupIDShard returns a new groupID for a shard -func NewGroupIDShard(sid ShardIDType) GroupID { - return GroupID(fmt.Sprintf("harmony/0.0.1/shard/%s", sid)) +// NewGroupIDByShardID returns a new groupID for a shard +func NewGroupIDByShardID(shardID ShardID) GroupID { + if shardID == 0 { + return GroupIDBeacon + } + return GroupID(fmt.Sprintf(GroupIDShardPrefix.String(), strconv.Itoa(int(shardID)))) } -// NewGroupIDShardClient returns a new groupID for a shard's client -func NewGroupIDShardClient(sid ShardIDType) GroupID { - return GroupID(fmt.Sprintf("harmony/0.0.1/shard/%s/client", sid)) +// NewClientGroupIDByShardID returns a new groupID for a shard's client +func NewClientGroupIDByShardID(shardID ShardID) GroupID { + if shardID == 0 { + return GroupIDBeaconClient + } + return GroupID(fmt.Sprintf(GroupIDShardClientPrefix.String(), strconv.Itoa(int(shardID)))) } // ActionType lists action on group diff --git a/p2p/group_test.go b/p2p/group_test.go index b89f22b1f..93cb85454 100644 --- a/p2p/group_test.go +++ b/p2p/group_test.go @@ -9,8 +9,7 @@ func TestGroupID_String(t *testing.T) { want string }{ {"empty", GroupID(""), ""}, - {"ABC", GroupID("ABC"), "414243"}, - {"binary", GroupID([]byte{1, 2, 3}), "010203"}, + {"ABC", GroupID("ABC"), "ABC"}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -48,11 +47,11 @@ func TestGroupAction(t *testing.T) { groupAction GroupAction expectedGroupActionName string }{ - {"BeaconStart", GroupAction{Name: GroupID("ABC"), Action: ActionStart}, "414243/ActionStart"}, - {"BeaconPause", GroupAction{Name: GroupID("ABC"), Action: ActionPause}, "414243/ActionPause"}, - {"BeaconResume", GroupAction{Name: GroupID("ABC"), Action: ActionResume}, "414243/ActionResume"}, - {"BeaconStop", GroupAction{Name: GroupID("ABC"), Action: ActionStop}, "414243/ActionStop"}, - {"BeaconUnknown", GroupAction{Name: GroupID("ABC"), Action: ActionType(8)}, "414243/ActionUnknown"}, + {"BeaconStart", GroupAction{Name: GroupID("ABC"), Action: ActionStart}, "ABC/ActionStart"}, + {"BeaconPause", GroupAction{Name: GroupID("ABC"), Action: ActionPause}, "ABC/ActionPause"}, + {"BeaconResume", GroupAction{Name: GroupID("ABC"), Action: ActionResume}, "ABC/ActionResume"}, + {"BeaconStop", GroupAction{Name: GroupID("ABC"), Action: ActionStop}, "ABC/ActionStop"}, + {"BeaconUnknown", GroupAction{Name: GroupID("ABC"), Action: ActionType(8)}, "ABC/ActionUnknown"}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/test/configs/beaconchain20.txt b/test/configs/beaconchain20.txt new file mode 100644 index 000000000..3d8dac9c9 --- /dev/null +++ b/test/configs/beaconchain20.txt @@ -0,0 +1,21 @@ +127.0.0.1 9000 validator 0 +127.0.0.1 9001 validator 0 +127.0.0.1 9002 validator 0 +127.0.0.1 9003 validator 0 +127.0.0.1 9004 validator 0 +127.0.0.1 9005 validator 0 +127.0.0.1 9006 validator 0 +127.0.0.1 9007 validator 0 +127.0.0.1 9008 validator 0 +127.0.0.1 9009 validator 0 +127.0.0.1 9010 validator 0 +127.0.0.1 9011 validator 0 +127.0.0.1 9012 validator 0 +127.0.0.1 9013 validator 0 +127.0.0.1 9014 validator 0 +127.0.0.1 9015 validator 0 +127.0.0.1 9016 validator 0 +127.0.0.1 9017 validator 0 +127.0.0.1 9018 validator 0 +127.0.0.1 9019 validator 0 +127.0.0.1 19999 client 0 diff --git a/test/configs/beaconchain50.txt b/test/configs/beaconchain40.txt similarity index 78% rename from test/configs/beaconchain50.txt rename to test/configs/beaconchain40.txt index 8f79412d7..466b2a406 100644 --- a/test/configs/beaconchain50.txt +++ b/test/configs/beaconchain40.txt @@ -1,4 +1,4 @@ -127.0.0.1 9000 leader 0 +127.0.0.1 9000 validator 0 127.0.0.1 9001 validator 0 127.0.0.1 9002 validator 0 127.0.0.1 9003 validator 0 @@ -38,14 +38,4 @@ 127.0.0.1 9037 validator 0 127.0.0.1 9038 validator 0 127.0.0.1 9039 validator 0 -127.0.0.1 9040 validator 0 -127.0.0.1 9041 validator 0 -127.0.0.1 9042 validator 0 -127.0.0.1 9043 validator 0 -127.0.0.1 9044 validator 0 -127.0.0.1 9045 validator 0 -127.0.0.1 9046 validator 0 -127.0.0.1 9047 validator 0 -127.0.0.1 9048 validator 0 -127.0.0.1 9049 validator 0 127.0.0.1 19999 client 0 diff --git a/test/deploy.sh b/test/deploy.sh index 4c65c6338..58bb36a17 100755 --- a/test/deploy.sh +++ b/test/deploy.sh @@ -130,7 +130,7 @@ sleep 1 BN_MA=$(grep "BN_MA" $log_folder/bootnode.log | awk -F\= ' { print $2 } ') HMY_OPT2=" -bootnodes $BN_MA" echo "bootnode launched." + " $BN_MA" -HMY_OPT3=" -is_beacon" +HMY_OPT3=" -is_genesis" NUM_NN=0 @@ -146,7 +146,7 @@ while IFS='' read -r line || [[ -n "$line" ]]; do fi if [ "$mode" == "leader_archival" ]; then echo "launching leader ..." - $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -account_index $i -min_peers $MIN $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key -is_leader -is_archival 2>&1 | tee -a $LOG_FILE & + $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -account_index $i -min_peers $MIN $HMY_OPT2 -key /tmp/$ip-$port.key -is_leader -is_archival 2>&1 | tee -a $LOG_FILE & fi if [ "$mode" == "validator" ]; then echo "launching validator ..." @@ -154,13 +154,12 @@ while IFS='' read -r line || [[ -n "$line" ]]; do fi if [ "$mode" == "archival" ]; then echo "launching archival node ... wait" - $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -account_index $i -min_peers $MIN $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key -is_archival 2>&1 | tee -a $LOG_FILE & + $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -account_index $i -min_peers $MIN $HMY_OPT2 -key /tmp/$ip-$port.key -is_archival 2>&1 | tee -a $LOG_FILE & fi - sleep 0.5 if [[ "$mode" == "newnode" && "$SYNC" == "true" ]]; then (( NUM_NN += 30 )) echo "launching new node ..." - (sleep $NUM_NN; $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -account_index $i -min_peers $MIN $HMY_OPT2 $HMY_OPT3 -key /tmp/$ip-$port.key 2>&1 | tee -a $LOG_FILE ) & + (sleep $NUM_NN; $DRYRUN $ROOT/bin/harmony -ip $ip -port $port -log_folder $log_folder $DB -account_index $i -min_peers $MIN $HMY_OPT2 -key /tmp/$ip-$port.key 2>&1 | tee -a $LOG_FILE ) & fi (( i++ )) done < $config