diff --git a/benchmark.go b/benchmark.go index 5b698c651..2b0f88e9d 100644 --- a/benchmark.go +++ b/benchmark.go @@ -224,9 +224,7 @@ func main() { } } else { if *peerDisvoery { - go func() { - currentNode.JoinShard(leader) - }() + go currentNode.JoinShard(leader) } } diff --git a/syncing/syncing.go b/syncing/syncing.go index b5a12a9b6..afd1cb892 100644 --- a/syncing/syncing.go +++ b/syncing/syncing.go @@ -51,7 +51,16 @@ type StateSync struct { stateSyncTaskQueue *queue.Queue } -func compareSyncPeerConfigByBlockHashes(a *SyncPeerConfig, b *SyncPeerConfig) int { +// CreateTestSyncPeerConfig used for testing. +func CreateTestSyncPeerConfig(client *downloader.Client, blockHashes [][]byte) *SyncPeerConfig { + return &SyncPeerConfig{ + client: client, + blockHashes: blockHashes, + } +} + +// CompareSyncPeerConfigByblockHashes compares two SyncPeerConfig by blockHashes. +func CompareSyncPeerConfigByblockHashes(a *SyncPeerConfig, b *SyncPeerConfig) int { if len(a.blockHashes) != len(b.blockHashes) { if len(a.blockHashes) < len(b.blockHashes) { return -1 @@ -114,8 +123,8 @@ func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) { } } -// makeConnectionToPeers makes grpc connection to all peers. -func (ss *StateSync) makeConnectionToPeers() { +// MakeConnectionToPeers makes grpc connection to all peers. +func (ss *StateSync) MakeConnectionToPeers() { var wg sync.WaitGroup wg.Add(ss.peerNumber) @@ -139,16 +148,16 @@ func (ss *StateSync) CleanUpNilPeers() { } } -// getHowMaxConsensus returns max number of consensus nodes and the first ID of consensus group. -// Assumption: all peers are sorted by compareSyncPeerConfigByBlockHashes first. -func (syncConfig *SyncConfig) getHowMaxConsensus() (int, int) { +// GetHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group. +// Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first. +func (syncConfig *SyncConfig) GetHowManyMaxConsensus() (int, int) { // As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively. curCount := 0 curFirstID := -1 maxCount := 0 maxFirstID := -1 for i := range syncConfig.peers { - if curFirstID == -1 || compareSyncPeerConfigByBlockHashes(syncConfig.peers[curFirstID], syncConfig.peers[i]) != 0 { + if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(syncConfig.peers[curFirstID], syncConfig.peers[i]) != 0 { curCount = 1 curFirstID = i } else { @@ -162,10 +171,19 @@ func (syncConfig *SyncConfig) getHowMaxConsensus() (int, int) { return maxFirstID, maxCount } +// InitForTesting used for testing. +func (syncConfig *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]byte) { + for i := range syncConfig.peers { + syncConfig.peers[i].blockHashes = blockHashes + syncConfig.peers[i].client = client + } +} + // CleanUpPeers cleans up all peers whose blockHashes are not equal to consensus block hashes. func (syncConfig *SyncConfig) CleanUpPeers(maxFirstID int) { - for i := range syncConfig.peers { - if compareSyncPeerConfigByBlockHashes(syncConfig.peers[maxFirstID], syncConfig.peers[i]) != 0 { + fixedPeer := syncConfig.peers[maxFirstID] + for i := 0; i < len(syncConfig.peers); i++ { + if CompareSyncPeerConfigByblockHashes(fixedPeer, syncConfig.peers[i]) != 0 { // TODO: move it into a util delete func. // See tip https://github.com/golang/go/wiki/SliceTricks // Close the client and remove the peer out of the @@ -177,13 +195,14 @@ func (syncConfig *SyncConfig) CleanUpPeers(maxFirstID int) { } } -// getBlockHashesConsensusAndCleanUp chesk if all consensus hashes are equal. -func (ss *StateSync) getBlockHashesConsensusAndCleanUp() bool { +// GetBlockHashesConsensusAndCleanUp chesk if all consensus hashes are equal. +func (ss *StateSync) GetBlockHashesConsensusAndCleanUp() bool { // Sort all peers by the blockHashes. sort.Slice(ss.syncConfig.peers, func(i, j int) bool { - return compareSyncPeerConfigByBlockHashes(ss.syncConfig.peers[i], ss.syncConfig.peers[j]) == -1 + return CompareSyncPeerConfigByblockHashes(ss.syncConfig.peers[i], ss.syncConfig.peers[j]) == -1 }) - maxCount, maxFirstID := ss.syncConfig.getHowMaxConsensus() + + maxFirstID, maxCount := ss.syncConfig.GetHowManyMaxConsensus() if float64(maxCount) >= ConsensusRatio*float64(ss.activePeerNumber) { ss.syncConfig.CleanUpPeers(maxFirstID) ss.CleanUpNilPeers() @@ -192,8 +211,8 @@ func (ss *StateSync) getBlockHashesConsensusAndCleanUp() bool { return false } -// getConsensusHashes gets all hashes needed to download. -func (ss *StateSync) getConsensusHashes() { +// GetConsensusHashes gets all hashes needed to download. +func (ss *StateSync) GetConsensusHashes() { for { var wg sync.WaitGroup wg.Add(ss.activePeerNumber) @@ -209,7 +228,7 @@ func (ss *StateSync) getConsensusHashes() { }(ss.syncConfig.peers[id]) } wg.Wait() - if ss.getBlockHashesConsensusAndCleanUp() { + if ss.GetBlockHashesConsensusAndCleanUp() { break } } @@ -272,10 +291,10 @@ func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) // Creates sync config. ss.CreateSyncConfig(peers) // Makes connections to peers. - ss.makeConnectionToPeers() + ss.MakeConnectionToPeers() for { // Gets consensus hashes. - ss.getConsensusHashes() + ss.GetConsensusHashes() // Generates state-sync task queue. ss.generateStateSyncTaskQueue(bc) diff --git a/syncing/syncing_test.go b/syncing/syncing_test.go index 30d755692..57eb60dce 100644 --- a/syncing/syncing_test.go +++ b/syncing/syncing_test.go @@ -10,6 +10,7 @@ import ( "github.com/harmony-one/harmony/syncing" "github.com/harmony-one/harmony/syncing/downloader" pb "github.com/harmony-one/harmony/syncing/downloader/proto" + "github.com/stretchr/testify/assert" "google.golang.org/grpc" ) @@ -69,6 +70,16 @@ func (node *FakeNode) Init(ip, port string) { node.server = downloader.NewServer(node) } +// SetBlockchain is used for testing +func (node *FakeNode) Init2(ip, port string) { + addresses := [][20]byte{TestAddressOne} + node.bc = bc.CreateBlockchainWithMoreBlocks(addresses, ShardID) + node.ip = ip + node.port = port + + node.server = downloader.NewServer(node) +} + // Start ... func (node *FakeNode) Start() error { var err error @@ -100,6 +111,16 @@ func (node *FakeNode) CalculateResponse(request *pb.DownloaderRequest) (*pb.Down return response, nil } +func TestCompareSyncPeerConfigByBlockHashes(t *testing.T) { + a := syncing.CreateTestSyncPeerConfig(nil, [][]byte{{1, 2, 3, 4, 5, 6}, {1, 2, 3, 4, 5, 6}}) + b := syncing.CreateTestSyncPeerConfig(nil, [][]byte{{1, 2, 3, 4, 5, 6}, {1, 2, 3, 4, 5, 6}}) + assert.Equal(t, syncing.CompareSyncPeerConfigByblockHashes(a, b), 0, "they should be equal") + c := syncing.CreateTestSyncPeerConfig(nil, [][]byte{{1, 2, 3, 4, 5, 7}, {1, 2, 3, 4, 5, 6}}) + assert.Equal(t, syncing.CompareSyncPeerConfigByblockHashes(a, c), -1, "a should be less than c") + d := syncing.CreateTestSyncPeerConfig(nil, [][]byte{{1, 2, 3, 4, 5, 4}, {1, 2, 3, 4, 5, 6}}) + assert.Equal(t, syncing.CompareSyncPeerConfigByblockHashes(a, d), 1, "a should be greater than c") +} + func TestSyncing(t *testing.T) { fakeNodes := []*FakeNode{&FakeNode{}, &FakeNode{}, &FakeNode{}} for i := range fakeNodes { @@ -108,6 +129,11 @@ func TestSyncing(t *testing.T) { t.Error(err) } } + defer func() { + for _, fakeNode := range fakeNodes { + fakeNode.grpcServer.Stop() + } + }() stateSync := &syncing.StateSync{} bc := &bc.Blockchain{} @@ -125,7 +151,41 @@ func TestSyncing(t *testing.T) { } } - for _, fakeNode := range fakeNodes { - fakeNode.grpcServer.Stop() +} + +func TestSyncingIncludingBadNode(t *testing.T) { + fakeNodes := []*FakeNode{&FakeNode{}, &FakeNode{}, &FakeNode{}} + for i := range fakeNodes { + if i == 2 { + // Bad node. + fakeNodes[i].Init2(serverIP, ServerPorts[i]) + } else { + // Good node. + fakeNodes[i].Init(serverIP, ServerPorts[i]) + } + if err := fakeNodes[i].Start(); err != nil { + t.Error(err) + } + } + defer func() { + for _, fakeNode := range fakeNodes { + fakeNode.grpcServer.Stop() + } + }() + + stateSync := &syncing.StateSync{} + bc := &bc.Blockchain{} + peers := make([]p2p.Peer, len(fakeNodes)) + for i := range peers { + peers[i].Ip = fakeNodes[i].ip + peers[i].Port = fakeNodes[i].port + } + + stateSync.StartStateSync(peers, bc) + + for i := range bc.Blocks { + if !reflect.DeepEqual(bc.Blocks[i], fakeNodes[0].bc.Blocks[i]) { + t.Error("not equal") + } } }