diff --git a/.travis.yml b/.travis.yml index 224d37793..89651aeb1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -24,6 +24,7 @@ install: - go get -u golang.org/x/tools/cmd/goimports - go get gopkg.in/check.v1 - go get github.com/harmony-ek/gencodec + - go get github.com/golang/mock/mockgen - go get github.com/golang/protobuf/protoc-gen-go - ./scripts/install_protoc.sh -V 3.6.1 - ./scripts/travis_checker.sh diff --git a/api/service/discovery/discovery_test.go b/api/service/discovery/discovery_test.go index e9f3043c9..1eb35fc22 100644 --- a/api/service/discovery/discovery_test.go +++ b/api/service/discovery/discovery_test.go @@ -2,6 +2,7 @@ package discovery import ( "testing" + "time" "github.com/harmony-one/harmony/api/service" "github.com/harmony-one/harmony/internal/utils" @@ -16,10 +17,17 @@ var ( ) func TestDiscoveryService(t *testing.T) { - selfPeer := p2p.Peer{IP: ip, Port: port} - priKey, _, err := utils.GenKeyP2P(ip, port) + nodePriKey, _, err := utils.LoadKeyFromFile("/tmp/127.0.0.1.12345.key") + if err != nil { + t.Fatal(err) + } + peerPriKey, peerPubKey := utils.GenKey("127.0.0.1", "12345") + if peerPriKey == nil || peerPubKey == nil { + t.Fatal("generate key error") + } + selfPeer := p2p.Peer{IP: "127.0.0.1", Port: "12345", ValidatorID: -1, PubKey: peerPubKey} - host, err := p2pimpl.NewHost(&selfPeer, priKey) + host, err := p2pimpl.NewHost(&selfPeer, nodePriKey) if err != nil { t.Fatalf("unable to new host in harmony: %v", err) } @@ -31,4 +39,10 @@ func TestDiscoveryService(t *testing.T) { if dService == nil { t.Fatalf("unable to create new discovery service") } + + dService.StartService() + + time.Sleep(3 * time.Second) + + dService.StopService() } diff --git a/api/service/discovery/service.go b/api/service/discovery/service.go index 1f22c5bd0..940a4f498 100644 --- a/api/service/discovery/service.go +++ b/api/service/discovery/service.go @@ -81,6 +81,7 @@ func (s *Service) contactP2pPeers() { pingMsg.Node.Role = proto_node.ClientRole clientMsgBuf := host.ConstructP2pMessage(byte(0), pingMsg.ConstructPingMessage()) + s.sentPingMessage(p2p.GroupIDBeacon, regMsgBuf, clientMsgBuf) for { select { case peer, ok := <-s.peerChan: @@ -104,7 +105,6 @@ func (s *Service) contactP2pPeers() { case action := <-s.actionChan: s.config.Actions[action.Name] = action.Action case <-tick.C: - var err error for g, a := range s.config.Actions { if a == p2p.ActionPause { // Recived Pause Message, to reduce the frequency of ping message to every 1 minute @@ -115,38 +115,46 @@ func (s *Service) contactP2pPeers() { } if a == p2p.ActionStart || a == p2p.ActionResume || a == p2p.ActionPause { - if g == p2p.GroupIDBeacon || g == p2p.GroupIDBeaconClient { - if s.config.IsBeacon { - // beacon chain node - err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Beacon}, regMsgBuf) - } else { - // non-beacon chain node, reg as client node - err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Beacon}, clientMsgBuf) - } - } else { - // The following logical will be used for 2nd stage peer discovery process - if s.config.Group == p2p.GroupIDUnknown { - continue - } - if s.config.IsClient { - // client node of reg shard, such as wallet/txgen - err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Group}, clientMsgBuf) - } else { - // regular node of a shard - err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Group}, regMsgBuf) - } - } - if err != nil { - utils.GetLogInstance().Error("[DISCOVERY] Failed to send ping message", "group", g) - } else { - //utils.GetLogInstance().Info("[DISCOVERY]", "Sent Ping Message", g) - } + s.sentPingMessage(g, regMsgBuf, clientMsgBuf) } } } } } +// sentPingMessage sends a ping message to a pubsub topic +func (s *Service) sentPingMessage(g p2p.GroupID, regMsgBuf, clientMsgBuf []byte) { + var err error + if g == p2p.GroupIDBeacon || g == p2p.GroupIDBeaconClient { + if s.config.IsBeacon { + // beacon chain node + err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Beacon}, regMsgBuf) + } else { + // non-beacon chain node, reg as client node + err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Beacon}, clientMsgBuf) + } + } else { + // The following logical will be used for 2nd stage peer discovery process + // do nothing when the groupID is unknown + if s.config.Group == p2p.GroupIDUnknown { + return + } + if s.config.IsClient { + // client node of reg shard, such as wallet/txgen + err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Group}, clientMsgBuf) + } else { + // regular node of a shard + err = s.host.SendMessageToGroups([]p2p.GroupID{s.config.Group}, regMsgBuf) + } + } + if err != nil { + utils.GetLogInstance().Error("Failed to send ping message", "group", g) + // } else { + // utils.GetLogInstance().Info("[DISCOVERY]", "Sent Ping Message", g) + } + +} + // Init is to initialize for discoveryService. func (s *Service) Init() { utils.GetLogInstance().Info("Init discovery service") diff --git a/api/service/manager.go b/api/service/manager.go index d5b88afcf..e2fcad4d9 100644 --- a/api/service/manager.go +++ b/api/service/manager.go @@ -31,6 +31,7 @@ const ( BlockProposal NetworkInfo PeerDiscovery + Resharding Staking Test Done @@ -56,6 +57,8 @@ func (t Type) String() string { return "Staking" case PeerDiscovery: return "PeerDiscovery" + case Resharding: + return "Resharding" case Test: return "Test" case Done: @@ -102,6 +105,10 @@ func (m *Manager) Register(t Type, service Interface) { if m.services == nil { m.services = make(map[Type]Interface) } + if _, ok := m.services[t]; ok { + utils.GetLogInstance().Error("This service is already included: ", "servie", t) + return + } m.services[t] = service } @@ -185,3 +192,24 @@ func (m *Manager) SetupServiceMessageChan(mapServiceTypeChan map[Type]chan *msg_ service.SetMessageChan(mapServiceTypeChan[serviceType]) } } + +// StopService stops service with type t. +func (m *Manager) StopService(t Type) { + if service, ok := m.services[t]; ok { + service.StopService() + } +} + +// StopServicesByRole stops all service of the given role. +func (m *Manager) StopServicesByRole(liveServices []Type) { + marked := make(map[Type]bool) + for _, s := range liveServices { + marked[s] = true + } + + for t := range m.GetServices() { + if _, ok := marked[t]; !ok { + m.StopService(t) + } + } +} diff --git a/api/service/manager_test.go b/api/service/manager_test.go index 8574e11df..28b809965 100644 --- a/api/service/manager_test.go +++ b/api/service/manager_test.go @@ -10,14 +10,21 @@ import ( type SupportSyncingTest struct { msgChan chan *msg_pb.Message + status *int } func (s *SupportSyncingTest) StartService() { fmt.Println("SupportSyncingTest starting") + if s.status != nil { + *s.status = 1 + } } func (s *SupportSyncingTest) StopService() { fmt.Println("SupportSyncingTest stopping") + if s.status != nil { + *s.status = 2 + } } func (s *SupportSyncingTest) NotifyService(data map[string]interface{}) { @@ -65,3 +72,26 @@ func TestMessageChan(t *testing.T) { m.SendAction(&Action{ServiceType: Done}) } + +func TestStopServices(t *testing.T) { + m := &Manager{} + m.SetupServiceManager() + status := 0 + m.RegisterService(SupportSyncing, &SupportSyncingTest{status: &status}) + msgChans := make(map[Type]chan *msg_pb.Message) + m.SetupServiceMessageChan(msgChans) + + m.SendAction(&Action{ + Action: Notify, + ServiceType: SupportSyncing, + Params: map[string]interface{}{ + "chan": msgChans[SupportSyncing], + "test": t, + }, + }) + + m.StopServicesByRole([]Type{}) + if status != 2 { + t.Error("Service did not stop") + } +} diff --git a/api/service/rconversion/service.go b/api/service/resharding/service.go similarity index 66% rename from api/service/rconversion/service.go rename to api/service/resharding/service.go index f1ca41d2c..9377ecf88 100644 --- a/api/service/rconversion/service.go +++ b/api/service/resharding/service.go @@ -1,20 +1,29 @@ -package rconversion +package resharding import ( + "time" + msg_pb "github.com/harmony-one/harmony/api/proto/message" + "github.com/harmony-one/harmony/core" "github.com/harmony-one/harmony/internal/utils" ) +// Constants for resharding service. +const ( + ReshardingCheckTime = time.Second +) + // Service is the role conversion service. type Service struct { stopChan chan struct{} stoppedChan chan struct{} messageChan chan *msg_pb.Message + beaconChain *core.BlockChain } // New returns role conversion service. -func New() *Service { - return &Service{} +func New(beaconChain *core.BlockChain) *Service { + return &Service{beaconChain: beaconChain} } // StartService starts role conversion service. @@ -49,6 +58,18 @@ func (s *Service) Run(stopChan chan struct{}, stoppedChan chan struct{}) { // DoService does role conversion. func (s *Service) DoService() { + tick := time.NewTicker(ReshardingCheckTime) + // Get current shard state hash. + currentShardStateHash := s.beaconChain.CurrentBlock().Header().ShardStateHash + for { + select { + case <-tick.C: + LatestShardStateHash := s.beaconChain.CurrentBlock().Header().ShardStateHash + if currentShardStateHash != LatestShardStateHash { + // TODO(minhdoan): Add resharding logic later after modifying the resharding func as it current doesn't calculate the role (leader/validator) + } + } + } } // StopService stops role conversion service. diff --git a/api/service/staking/service.go b/api/service/staking/service.go index f37f6beb4..745d9226e 100644 --- a/api/service/staking/service.go +++ b/api/service/staking/service.go @@ -5,12 +5,11 @@ import ( "math/big" "time" - "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" + "github.com/ethereum/go-ethereum/rlp" protobuf "github.com/golang/protobuf/proto" proto "github.com/harmony-one/harmony/api/client/service/proto" proto_common "github.com/harmony-one/harmony/api/proto" @@ -178,11 +177,12 @@ func (s *Service) createRawStakingMessage() []byte { tx := types.NewTransaction( stakingInfo.Nonce, toAddress, - 0, // beacon chain. + 0, big.NewInt(s.stakingAmount), - params.TxGas*10, // hard-code - nil, // pick some predefined gas price. - common.FromHex("0xd0e30db0")) // Refer to Node.DepositFuncSignature + params.TxGas*10, + nil, + common.FromHex("0xd0e30db0"), + ) if signedTx, err := types.SignTx(tx, types.HomesteadSigner{}, s.accountKey); err == nil { ts := types.Transactions{signedTx} diff --git a/cmd/client/txgen/main.go b/cmd/client/txgen/main.go index d2e41c96f..884efe232 100644 --- a/cmd/client/txgen/main.go +++ b/cmd/client/txgen/main.go @@ -123,11 +123,7 @@ func main() { clientNode.Client = client.NewClient(clientNode.GetHost(), shardIDLeaderMap) readySignal := make(chan uint32) - go func() { - for i := range shardIDLeaderMap { - readySignal <- i - } - }() + // This func is used to update the client's blockchain when new blocks are received from the leaders updateBlocksFunc := func(blocks []*types.Block) { utils.GetLogInstance().Info("[Txgen] Received new block", "block", blocks) @@ -158,12 +154,18 @@ func main() { // Start the client server to listen to leader's message go clientNode.StartServer() - // wait for 1 seconds for client to send ping message to leader - time.Sleep(time.Second) clientNode.State = node.NodeReadyForConsensus + go func() { + // wait for 3 seconds for client to send ping message to leader + // FIXME (leo) the readySignal should be set once we really sent ping message to leader + time.Sleep(3 * time.Second) // wait for nodes to be ready + for i := range shardIDLeaderMap { + readySignal <- i + } + }() + // Transaction generation process - time.Sleep(5 * time.Second) // wait for nodes to be ready start := time.Now() totalTime := float64(*duration) diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index a066575d7..88c1ca09a 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -58,11 +58,6 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan *types.Block, stop utils.GetLogInstance().Debug("WaitForNewBlock", "removed peers", c) } - for !consensus.HasEnoughValidators() { - utils.GetLogInstance().Debug("Not enough validators", "# Validators", len(consensus.PublicKeys)) - time.Sleep(waitForEnoughValidators * time.Millisecond) - } - if core.IsEpochBlock(newBlock) { // Receive pRnd from DRG protocol utils.GetLogInstance().Debug("[DRG] Waiting for pRnd") @@ -365,14 +360,3 @@ func (consensus *Consensus) reportMetrics(block types.Block) { } profiler.LogMetrics(metrics) } - -// HasEnoughValidators checks the number of publicKeys to determine -// if the shard has enough validators -// FIXME (HAR-82): we need epoch support or a better way to determine -// when to initiate the consensus -func (consensus *Consensus) HasEnoughValidators() bool { - if len(consensus.PublicKeys) < consensus.MinPeers { - return false - } - return true -} diff --git a/core/blockchain.go b/core/blockchain.go index a51227bb1..5a468b52d 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1256,6 +1256,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks) (int, []interface{}, []*ty stats.report(chain, i, cache) // only insert new shardstate when block is epoch block + // TODO(minhdoan): this is only for beaconc chain bc.InsertNewShardState(block) } // Append a single chain head event if we've progressed the chain @@ -1735,6 +1736,7 @@ func (bc *BlockChain) ValidateNewShardState(block *types.Block) error { // InsertNewShardState insert new shard state into epoch block func (bc *BlockChain) InsertNewShardState(block *types.Block) { + // write state into db. shardState := bc.GetNewShardState(block) if shardState == nil { return diff --git a/core/resharding.go b/core/resharding.go index 385312e62..2cc79f426 100644 --- a/core/resharding.go +++ b/core/resharding.go @@ -13,6 +13,8 @@ import ( const ( // InitialSeed is the initial random seed, a magic number to answer everything, remove later InitialSeed uint32 = 42 + // FirstEpoch is the number of the first epoch. + FirstEpoch = 0 ) // ShardingState is data structure hold the sharding state @@ -47,36 +49,50 @@ func (ss *ShardingState) assignNewNodes(newNodeList []types.NodeID) { // cuckooResharding uses cuckoo rule to reshard X% of active committee(shards) into inactive committee(shards) func (ss *ShardingState) cuckooResharding(percent float64) { - ss.sortCommitteeBySize() numActiveShards := ss.numShards / 2 kickedNodes := []types.NodeID{} for i := range ss.shardState { if i >= numActiveShards { break } - Shuffle(ss.shardState[i].NodeList) numKicked := int(percent * float64(len(ss.shardState[i].NodeList))) - tmp := ss.shardState[i].NodeList[:numKicked] + if numKicked == 0 { + numKicked++ + } + length := len(ss.shardState[i].NodeList) + tmp := ss.shardState[i].NodeList[length-numKicked:] kickedNodes = append(kickedNodes, tmp...) - ss.shardState[i].NodeList = ss.shardState[i].NodeList[numKicked:] + ss.shardState[i].NodeList = ss.shardState[i].NodeList[:length-numKicked] } Shuffle(kickedNodes) + numInactiveShards := ss.numShards - numActiveShards for i, nid := range kickedNodes { - id := numActiveShards + i%(ss.numShards-numActiveShards) + id := numActiveShards + i%numInactiveShards ss.shardState[id].NodeList = append(ss.shardState[id].NodeList, nid) } } +// UpdateShardState will first add new nodes into shards, then use cuckoo rule to reshard to get new shard state +func (ss *ShardingState) assignLeaders() { + for i := 0; i < ss.numShards; i++ { + Shuffle(ss.shardState[i].NodeList) + ss.shardState[i].Leader = ss.shardState[i].NodeList[0] + } +} + // UpdateShardState will first add new nodes into shards, then use cuckoo rule to reshard to get new shard state func (ss *ShardingState) UpdateShardState(newNodeList []types.NodeID, percent float64) { rand.Seed(int64(ss.rnd)) + ss.sortCommitteeBySize() + ss.assignLeaders() ss.assignNewNodes(newNodeList) ss.cuckooResharding(percent) } // Shuffle will shuffle the list with result uniquely determined by seed, assuming there is no repeat items in the list func Shuffle(list []types.NodeID) { + // Sort to make sure everyone will generate the same with the same rand seed. sort.Slice(list, func(i, j int) bool { return types.CompareNodeID(list[i], list[j]) == -1 }) @@ -96,16 +112,6 @@ func GetEpochFromBlockNumber(blockNumber uint64) uint64 { return blockNumber / uint64(BlocksPerEpoch) } -// GetPreviousEpochBlockNumber gets the epoch block number of previous epoch -func GetPreviousEpochBlockNumber(blockNumber uint64) uint64 { - epoch := GetEpochFromBlockNumber(blockNumber) - if epoch == 1 { - // no previous epoch - return epoch - } - return GetBlockNumberFromEpoch(epoch - 1) -} - // GetShardingStateFromBlockChain will retrieve random seed and shard map from beacon chain for given a epoch func GetShardingStateFromBlockChain(bc *BlockChain, epoch uint64) *ShardingState { number := GetBlockNumberFromEpoch(epoch) @@ -120,8 +126,8 @@ func GetShardingStateFromBlockChain(bc *BlockChain, epoch uint64) *ShardingState // CalculateNewShardState get sharding state from previous epoch and calcualte sharding state for new epoch // TODO: currently, we just mock everything func CalculateNewShardState(bc *BlockChain, epoch uint64) types.ShardState { - if epoch == 1 { - return fakeGetInitShardState() + if epoch == FirstEpoch { + return fakeGetInitShardState(6, 10) } ss := GetShardingStateFromBlockChain(bc, epoch-1) newNodeList := fakeNewNodeList(int64(ss.rnd)) @@ -140,23 +146,17 @@ func (ss *ShardingState) calculateKickoutRate(newNodeList []types.NodeID) float6 return 0.0 } rate := newNodesPerShard / float64(L) - return math.Min(rate, 1.0) -} - -// FakeGenRandSeed generate random seed based on previous rnd seed; remove later after VRF implemented -func FakeGenRandSeed(seed uint32) uint32 { - rand.Seed(int64(seed)) - return rand.Uint32() + return math.Max(0.1, math.Min(rate, 1.0)) } // remove later after bootstrap codes ready -func fakeGetInitShardState() types.ShardState { +func fakeGetInitShardState(numberOfShards, numOfNodes int) types.ShardState { rand.Seed(int64(InitialSeed)) shardState := types.ShardState{} - for i := 0; i < 6; i++ { + for i := 0; i < numberOfShards; i++ { sid := uint32(i) com := types.Committee{ShardID: sid} - for j := 0; j < 10; j++ { + for j := 0; j < numOfNodes; j++ { nid := strconv.Itoa(int(rand.Int63())) com.NodeList = append(com.NodeList, types.NodeID(nid)) } diff --git a/core/resharding_test.go b/core/resharding_test.go index 6d027ef37..d5d92a0ec 100644 --- a/core/resharding_test.go +++ b/core/resharding_test.go @@ -14,7 +14,19 @@ func TestFakeNewNodeList(t *testing.T) { } func TestShuffle(t *testing.T) { - nodeList := []types.NodeID{"node1", "node2", "node3", "node4", "node5", "node6", "node7", "node8", "node9", "node10"} + nodeList := []types.NodeID{ + "node1", + "node2", + "node3", + "node4", + "node5", + "node6", + "node7", + "node8", + "node9", + "node10", + } + cpList := []types.NodeID{} cpList = append(cpList, nodeList...) Shuffle(nodeList) @@ -31,17 +43,49 @@ func TestShuffle(t *testing.T) { } func TestSortCommitteeBySize(t *testing.T) { - shardState := fakeGetInitShardState() + shardState := fakeGetInitShardState(6, 10) ss := &ShardingState{epoch: 1, rnd: 42, shardState: shardState, numShards: len(shardState)} ss.sortCommitteeBySize() for i := 0; i < ss.numShards-1; i++ { assert.Equal(t, true, len(ss.shardState[i].NodeList) >= len(ss.shardState[i+1].NodeList)) } +} + +func TestUpdateShardState(t *testing.T) { + shardState := fakeGetInitShardState(6, 10) + ss := &ShardingState{epoch: 1, rnd: 42, shardState: shardState, numShards: len(shardState)} + newNodeList := []types.NodeID{ + "node1", + "node2", + "node3", + "node4", + "node5", + "node6", + } + + ss.UpdateShardState(newNodeList, 0.2) + assert.Equal(t, 6, ss.numShards) + for _, shard := range ss.shardState { + assert.Equal(t, string(shard.Leader), string(shard.NodeList[0])) + } +} + +func TestAssignNewNodes(t *testing.T) { + shardState := fakeGetInitShardState(2, 2) + ss := &ShardingState{epoch: 1, rnd: 42, shardState: shardState, numShards: len(shardState)} + newNodes := []types.NodeID{ + "node1", + "node2", + "node3", + } + ss.assignNewNodes(newNodes) + assert.Equal(t, 2, ss.numShards) + assert.Equal(t, 5, len(ss.shardState[0].NodeList)) } func TestCalculateKickoutRate(t *testing.T) { - shardState := fakeGetInitShardState() + shardState := fakeGetInitShardState(6, 10) ss := &ShardingState{epoch: 1, rnd: 42, shardState: shardState, numShards: len(shardState)} newNodeList := fakeNewNodeList(42) percent := ss.calculateKickoutRate(newNodeList) diff --git a/core/types/shard_state.go b/core/types/shard_state.go index 879d62629..399c4477c 100644 --- a/core/types/shard_state.go +++ b/core/types/shard_state.go @@ -2,21 +2,23 @@ package types import ( "sort" + "strings" "github.com/ethereum/go-ethereum/common" "golang.org/x/crypto/sha3" ) -// NodeID is a unique ID represent a node -type NodeID string - // ShardState is the collection of all committees type ShardState []Committee +// NodeID represents node id. +type NodeID string + // Committee contains the active nodes in one shard type Committee struct { ShardID uint32 - NodeList []NodeID // a list of NodeID where NodeID is represented by a string + Leader NodeID + NodeList []NodeID } // GetHashFromNodeList will sort the list, then use Keccak256 to hash the list @@ -53,16 +55,10 @@ func (ss ShardState) Hash() (h common.Hash) { // CompareNodeID compares two nodes by their ID; used to sort node list func CompareNodeID(n1 NodeID, n2 NodeID) int { - if n1 < n2 { - return -1 - } - if n1 > n2 { - return 1 - } - return 0 + return strings.Compare(string(n1), string(n2)) } // Serialize serialize NodeID into bytes -func (n NodeID) Serialize() []byte { - return []byte(n) +func (n *NodeID) Serialize() []byte { + return []byte(string(*n)) } diff --git a/core/types/shard_state_test.go b/core/types/shard_state_test.go index fa6a902b4..0d5c45dbd 100644 --- a/core/types/shard_state_test.go +++ b/core/types/shard_state_test.go @@ -6,8 +6,16 @@ import ( ) func TestGetHashFromNodeList(t *testing.T) { - l1 := []NodeID{"node1", "node2", "node3"} - l2 := []NodeID{"node2", "node1", "node3"} + l1 := []NodeID{ + "node1", + "node2", + "node3", + } + l2 := []NodeID{ + "node2", + "node1", + "node3", + } h1 := GetHashFromNodeList(l1) h2 := GetHashFromNodeList(l2) @@ -17,13 +25,46 @@ func TestGetHashFromNodeList(t *testing.T) { } func TestHash(t *testing.T) { - com1 := Committee{ShardID: 22, NodeList: []NodeID{"node11", "node22", "node1"}} - com2 := Committee{ShardID: 2, NodeList: []NodeID{"node4", "node5", "node6"}} + com1 := Committee{ + ShardID: 22, + Leader: "node11", + NodeList: []NodeID{ + "node11", + "node22", + "node1", + }, + } + com2 := Committee{ + ShardID: 2, + Leader: "node4", + NodeList: []NodeID{ + "node4", + "node5", + "node6", + }, + } shardState1 := ShardState{com1, com2} h1 := shardState1.Hash() - com3 := Committee{ShardID: 2, NodeList: []NodeID{"node6", "node5", "node4"}} - com4 := Committee{ShardID: 22, NodeList: []NodeID{"node1", "node11", "node22"}} + com3 := Committee{ + ShardID: 2, + Leader: "node4", + NodeList: []NodeID{ + "node6", + "node5", + "node4", + }, + } + com4 := Committee{ + ShardID: 22, + Leader: "node11", + NodeList: []NodeID{ + "node1", + "node11", + "node22", + }, + } + shardState2 := ShardState{com3, com4} h2 := shardState2.Hash() diff --git a/node/node.go b/node/node.go index 7694559d6..4a85a3546 100644 --- a/node/node.go +++ b/node/node.go @@ -484,3 +484,19 @@ func (node *Node) initBeaconNodeConfiguration() (service.NodeConfig, chan p2p.Pe return nodeConfig, chanPeer } + +// AddBeaconChainDatabase adds database support for beaconchain blocks on normal sharding nodes (not BeaconChain node) +func (node *Node) AddBeaconChainDatabase(db ethdb.Database) { + database := db + if database == nil { + database = ethdb.NewMemDatabase() + } + // TODO (chao) currently we use the same genesis block as normal shard + chain, err := node.GenesisBlockSetup(database) + if err != nil { + utils.GetLogInstance().Error("Error when doing genesis setup") + os.Exit(1) + } + node.beaconChain = chain + node.BeaconWorker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey), node.Consensus.ShardID) +} diff --git a/node/node_handler.go b/node/node_handler.go index bc889cf52..512108b66 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -390,7 +390,8 @@ func (node *Node) SendPongMessage() { sentMessage = false } else { // stable number of peers/pubkeys, sent the pong message - if !sentMessage { + // also make sure number of peers is greater than the minimal required number + if !sentMessage && numPubKeysNow >= node.Consensus.MinPeers { pong := proto_discovery.NewPongMessage(peers, node.Consensus.PublicKeys, node.Consensus.GetLeaderPubKey()) buffer := pong.ConstructPongMessage() err := node.host.SendMessageToGroups([]p2p.GroupID{node.MyShardGroupID}, host.ConstructP2pMessage(byte(0), buffer)) @@ -403,6 +404,8 @@ func (node *Node) SendPongMessage() { sentMessage = true // stop sending ping message node.serviceManager.TakeAction(&service.Action{Action: service.Stop, ServiceType: service.PeerDiscovery}) + // wait a bit until all validators received pong message + time.Sleep(200 * time.Millisecond) node.startConsensus <- struct{}{} } } @@ -419,6 +422,22 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { return -1 } + // set the leader pub key is the first thing to do + // otherwise, we may not be able to validate the consensus messages received + // which will result in first consensus timeout + err = node.Consensus.SetLeaderPubKey(pong.LeaderPubKey) + if err != nil { + utils.GetLogInstance().Error("Unmarshal Consensus Leader PubKey Failed", "error", err) + } else { + utils.GetLogInstance().Info("Set Consensus Leader PubKey") + } + err = node.DRand.SetLeaderPubKey(pong.LeaderPubKey) + if err != nil { + utils.GetLogInstance().Error("Unmarshal DRand Leader PubKey Failed", "error", err) + } else { + utils.GetLogInstance().Info("Set DRand Leader PubKey") + } + peers := make([]*p2p.Peer, 0) for _, p := range pong.Peers { @@ -441,15 +460,6 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int { node.AddPeers(peers) } - err = node.Consensus.SetLeaderPubKey(pong.LeaderPubKey) - if err != nil { - utils.GetLogInstance().Error("Unmarshal Consensus Leader PubKey Failed", "error", err) - } - err = node.DRand.SetLeaderPubKey(pong.LeaderPubKey) - if err != nil { - utils.GetLogInstance().Error("Unmarshal DRand Leader PubKey Failed", "error", err) - } - // Reset Validator PublicKeys every time we receive PONG message from Leader // The PublicKeys has to be idential across the shard on every node // TODO (lc): we need to handle RemovePeer situation diff --git a/node/node_newblock.go b/node/node_newblock.go index e65a612aa..19c130571 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -58,6 +58,7 @@ func (node *Node) WaitForConsensusReady(readySignal chan struct{}, stopChan chan utils.GetLogInstance().Debug("Failed commiting new block", "Error", err) } else { // add new shard state if it's epoch block + // TODO(minhdoan): only happens for beaconchain node.addNewShardState(block) newBlock = block break diff --git a/node/service_setup.go b/node/service_setup.go index 08b8afb5a..6bdc20b49 100644 --- a/node/service_setup.go +++ b/node/service_setup.go @@ -1,12 +1,6 @@ package node import ( - "os" - - "github.com/harmony-one/harmony/api/service/staking" - - "github.com/ethereum/go-ethereum/ethdb" - "github.com/ethereum/go-ethereum/params" msg_pb "github.com/harmony-one/harmony/api/proto/message" "github.com/harmony-one/harmony/api/service" "github.com/harmony-one/harmony/api/service/blockproposal" @@ -16,9 +10,8 @@ import ( "github.com/harmony-one/harmony/api/service/explorer" "github.com/harmony-one/harmony/api/service/networkinfo" "github.com/harmony-one/harmony/api/service/randomness" - "github.com/harmony-one/harmony/crypto/pki" + "github.com/harmony-one/harmony/api/service/staking" "github.com/harmony-one/harmony/internal/utils" - "github.com/harmony-one/harmony/node/worker" "github.com/harmony-one/harmony/p2p" ) @@ -99,22 +92,6 @@ func (node *Node) setupForClientNode() { node.serviceManager.RegisterService(service.NetworkInfo, networkinfo.New(node.host, p2p.GroupIDBeacon, chanPeer, nil)) } -// AddBeaconChainDatabase adds database support for beaconchain blocks on normal sharding nodes (not BeaconChain node) -func (node *Node) AddBeaconChainDatabase(db ethdb.Database) { - database := db - if database == nil { - database = ethdb.NewMemDatabase() - } - // TODO (chao) currently we use the same genesis block as normal shard - chain, err := node.GenesisBlockSetup(database) - if err != nil { - utils.GetLogInstance().Error("Error when doing genesis setup") - os.Exit(1) - } - node.beaconChain = chain - node.BeaconWorker = worker.New(params.TestChainConfig, chain, node.Consensus, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey), node.Consensus.ShardID) -} - // ServiceManagerSetup setups service store. func (node *Node) ServiceManagerSetup() { node.serviceManager = &service.Manager{} @@ -144,3 +121,12 @@ func (node *Node) RunServices() { } node.serviceManager.RunServices() } + +// StopServices runs registered services. +func (node *Node) StopServices() { + if node.serviceManager == nil { + utils.GetLogInstance().Info("Service manager is not set up yet.") + return + } + node.serviceManager.StopServicesByRole([]service.Type{}) +} diff --git a/p2p/host/hostv2/mock/hostv2_mock.go b/p2p/host/hostv2/mock/hostv2_mock.go index d86af79c9..6a1e05a69 100644 --- a/p2p/host/hostv2/mock/hostv2_mock.go +++ b/p2p/host/hostv2/mock/hostv2_mock.go @@ -36,6 +36,7 @@ func (m *Mockpubsub) EXPECT() *MockpubsubMockRecorder { // Publish mocks base method func (m *Mockpubsub) Publish(topic string, data []byte) error { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Publish", topic, data) ret0, _ := ret[0].(error) return ret0 @@ -43,11 +44,13 @@ func (m *Mockpubsub) Publish(topic string, data []byte) error { // Publish indicates an expected call of Publish func (mr *MockpubsubMockRecorder) Publish(topic, data interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*Mockpubsub)(nil).Publish), topic, data) } // Subscribe mocks base method func (m *Mockpubsub) Subscribe(topic string, opts ...go_libp2p_pubsub.SubOpt) (*go_libp2p_pubsub.Subscription, error) { + m.ctrl.T.Helper() varargs := []interface{}{topic} for _, a := range opts { varargs = append(varargs, a) @@ -60,6 +63,7 @@ func (m *Mockpubsub) Subscribe(topic string, opts ...go_libp2p_pubsub.SubOpt) (* // Subscribe indicates an expected call of Subscribe func (mr *MockpubsubMockRecorder) Subscribe(topic interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() varargs := append([]interface{}{topic}, opts...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*Mockpubsub)(nil).Subscribe), varargs...) } @@ -89,6 +93,7 @@ func (m *Mocksubscription) EXPECT() *MocksubscriptionMockRecorder { // Next mocks base method func (m *Mocksubscription) Next(ctx context.Context) (*go_libp2p_pubsub.Message, error) { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Next", ctx) ret0, _ := ret[0].(*go_libp2p_pubsub.Message) ret1, _ := ret[1].(error) @@ -97,15 +102,18 @@ func (m *Mocksubscription) Next(ctx context.Context) (*go_libp2p_pubsub.Message, // Next indicates an expected call of Next func (mr *MocksubscriptionMockRecorder) Next(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Next", reflect.TypeOf((*Mocksubscription)(nil).Next), ctx) } // Cancel mocks base method func (m *Mocksubscription) Cancel() { + m.ctrl.T.Helper() m.ctrl.Call(m, "Cancel") } // Cancel indicates an expected call of Cancel func (mr *MocksubscriptionMockRecorder) Cancel() *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Cancel", reflect.TypeOf((*Mocksubscription)(nil).Cancel)) } diff --git a/p2p/host/mock/host_mock.go b/p2p/host/mock/host_mock.go index cf7d8fa4a..bdf4b8720 100644 --- a/p2p/host/mock/host_mock.go +++ b/p2p/host/mock/host_mock.go @@ -37,6 +37,7 @@ func (m *MockHost) EXPECT() *MockHostMockRecorder { // GetSelfPeer mocks base method func (m *MockHost) GetSelfPeer() p2p.Peer { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetSelfPeer") ret0, _ := ret[0].(p2p.Peer) return ret0 @@ -44,11 +45,13 @@ func (m *MockHost) GetSelfPeer() p2p.Peer { // GetSelfPeer indicates an expected call of GetSelfPeer func (mr *MockHostMockRecorder) GetSelfPeer() *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSelfPeer", reflect.TypeOf((*MockHost)(nil).GetSelfPeer)) } // Close mocks base method func (m *MockHost) Close() error { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Close") ret0, _ := ret[0].(error) return ret0 @@ -56,11 +59,13 @@ func (m *MockHost) Close() error { // Close indicates an expected call of Close func (mr *MockHostMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockHost)(nil).Close)) } // AddPeer mocks base method func (m *MockHost) AddPeer(arg0 *p2p.Peer) error { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AddPeer", arg0) ret0, _ := ret[0].(error) return ret0 @@ -68,11 +73,13 @@ func (m *MockHost) AddPeer(arg0 *p2p.Peer) error { // AddPeer indicates an expected call of AddPeer func (mr *MockHostMockRecorder) AddPeer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddPeer", reflect.TypeOf((*MockHost)(nil).AddPeer), arg0) } // GetID mocks base method func (m *MockHost) GetID() go_libp2p_peer.ID { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetID") ret0, _ := ret[0].(go_libp2p_peer.ID) return ret0 @@ -80,11 +87,13 @@ func (m *MockHost) GetID() go_libp2p_peer.ID { // GetID indicates an expected call of GetID func (mr *MockHostMockRecorder) GetID() *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetID", reflect.TypeOf((*MockHost)(nil).GetID)) } // GetP2PHost mocks base method func (m *MockHost) GetP2PHost() go_libp2p_host.Host { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetP2PHost") ret0, _ := ret[0].(go_libp2p_host.Host) return ret0 @@ -92,21 +101,25 @@ func (m *MockHost) GetP2PHost() go_libp2p_host.Host { // GetP2PHost indicates an expected call of GetP2PHost func (mr *MockHostMockRecorder) GetP2PHost() *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetP2PHost", reflect.TypeOf((*MockHost)(nil).GetP2PHost)) } // ConnectHostPeer mocks base method func (m *MockHost) ConnectHostPeer(arg0 p2p.Peer) { + m.ctrl.T.Helper() m.ctrl.Call(m, "ConnectHostPeer", arg0) } // ConnectHostPeer indicates an expected call of ConnectHostPeer func (mr *MockHostMockRecorder) ConnectHostPeer(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConnectHostPeer", reflect.TypeOf((*MockHost)(nil).ConnectHostPeer), arg0) } // SendMessageToGroups mocks base method func (m *MockHost) SendMessageToGroups(groups []p2p.GroupID, msg []byte) error { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SendMessageToGroups", groups, msg) ret0, _ := ret[0].(error) return ret0 @@ -114,11 +127,13 @@ func (m *MockHost) SendMessageToGroups(groups []p2p.GroupID, msg []byte) error { // SendMessageToGroups indicates an expected call of SendMessageToGroups func (mr *MockHostMockRecorder) SendMessageToGroups(groups, msg interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMessageToGroups", reflect.TypeOf((*MockHost)(nil).SendMessageToGroups), groups, msg) } // GroupReceiver mocks base method func (m *MockHost) GroupReceiver(arg0 p2p.GroupID) (p2p.GroupReceiver, error) { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GroupReceiver", arg0) ret0, _ := ret[0].(p2p.GroupReceiver) ret1, _ := ret[1].(error) @@ -127,5 +142,6 @@ func (m *MockHost) GroupReceiver(arg0 p2p.GroupID) (p2p.GroupReceiver, error) { // GroupReceiver indicates an expected call of GroupReceiver func (mr *MockHostMockRecorder) GroupReceiver(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GroupReceiver", reflect.TypeOf((*MockHost)(nil).GroupReceiver), arg0) } diff --git a/p2p/mock_stream.go b/p2p/mock_stream.go index b4446aa3a..6e8859bc3 100644 --- a/p2p/mock_stream.go +++ b/p2p/mock_stream.go @@ -35,6 +35,7 @@ func (m *MockStream) EXPECT() *MockStreamMockRecorder { // Read mocks base method func (m *MockStream) Read(arg0 []byte) (int, error) { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Read", arg0) ret0, _ := ret[0].(int) ret1, _ := ret[1].(error) @@ -43,11 +44,13 @@ func (m *MockStream) Read(arg0 []byte) (int, error) { // Read indicates an expected call of Read func (mr *MockStreamMockRecorder) Read(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockStream)(nil).Read), arg0) } // Write mocks base method func (m *MockStream) Write(arg0 []byte) (int, error) { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Write", arg0) ret0, _ := ret[0].(int) ret1, _ := ret[1].(error) @@ -56,11 +59,13 @@ func (m *MockStream) Write(arg0 []byte) (int, error) { // Write indicates an expected call of Write func (mr *MockStreamMockRecorder) Write(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Write", reflect.TypeOf((*MockStream)(nil).Write), arg0) } // Close mocks base method func (m *MockStream) Close() error { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Close") ret0, _ := ret[0].(error) return ret0 @@ -68,11 +73,13 @@ func (m *MockStream) Close() error { // Close indicates an expected call of Close func (mr *MockStreamMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockStream)(nil).Close)) } // SetReadDeadline mocks base method func (m *MockStream) SetReadDeadline(arg0 time.Time) error { + m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SetReadDeadline", arg0) ret0, _ := ret[0].(error) return ret0 @@ -80,5 +87,6 @@ func (m *MockStream) SetReadDeadline(arg0 time.Time) error { // SetReadDeadline indicates an expected call of SetReadDeadline func (mr *MockStreamMockRecorder) SetReadDeadline(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetReadDeadline", reflect.TypeOf((*MockStream)(nil).SetReadDeadline), arg0) } diff --git a/scripts/travis_checker.sh b/scripts/travis_checker.sh index 559743ed4..d4702bd45 100755 --- a/scripts/travis_checker.sh +++ b/scripts/travis_checker.sh @@ -52,16 +52,25 @@ echo "Running go generate..." gogenerate_status_before="${tmpdir}/gogenerate_status_before.txt" gogenerate_status_after="${tmpdir}/gogenerate_status_after.txt" gogenerate_status_diff="${tmpdir}/gogenerate_status.diff" +gogenerate_output="${tmpdir}/gogenerate_output.txt" git status --porcelain=v2 > "${gogenerate_status_before}" -"${progdir}/gogenerate.sh" -git status --porcelain=v2 > "${gogenerate_status_after}" -if diff -u "${gogenerate_status_before}" "${gogenerate_status_after}" \ - > "${gogenerate_status_diff}" +if "${progdir}/gogenerate.sh" > "${gogenerate_output}" 2>&1 then - echo "go generate succeeded; all generated files seem up to date." + echo "go generate succeeded." + echo "Checking if go generate changed any files..." + git status --porcelain=v2 > "${gogenerate_status_after}" + if diff -u "${gogenerate_status_before}" "${gogenerate_status_after}" \ + > "${gogenerate_status_diff}" + then + echo "All generated files seem up to date." + else + echo "go generate changed working tree contents!" + "${progdir}/print_file.sh" "${gogenerate_status_diff}" "git status diff" + ok=false + fi else - echo "go generate changed working tree contents!" - "${progdir}/print_file.sh" "${gogenerate_status_diff}" "git status diff" + echo "go generate FAILED!" + "${progdir}/print_file.sh" "${gogenerate_output}" "go generate" ok=false fi