add more tests when bad node participarting in syncing

pull/110/head^2^2
Minh Doan 6 years ago
parent 14d130566d
commit a0a4656315
  1. 4
      benchmark.go
  2. 55
      syncing/syncing.go
  3. 64
      syncing/syncing_test.go

@ -224,9 +224,7 @@ func main() {
} }
} else { } else {
if *peerDisvoery { if *peerDisvoery {
go func() { go currentNode.JoinShard(leader)
currentNode.JoinShard(leader)
}()
} }
} }

@ -51,7 +51,16 @@ type StateSync struct {
stateSyncTaskQueue *queue.Queue stateSyncTaskQueue *queue.Queue
} }
func compareSyncPeerConfigByBlockHashes(a *SyncPeerConfig, b *SyncPeerConfig) int { // CreateTestSyncPeerConfig used for testing.
func CreateTestSyncPeerConfig(client *downloader.Client, blockHashes [][]byte) *SyncPeerConfig {
return &SyncPeerConfig{
client: client,
blockHashes: blockHashes,
}
}
// CompareSyncPeerConfigByblockHashes compares two SyncPeerConfig by blockHashes.
func CompareSyncPeerConfigByblockHashes(a *SyncPeerConfig, b *SyncPeerConfig) int {
if len(a.blockHashes) != len(b.blockHashes) { if len(a.blockHashes) != len(b.blockHashes) {
if len(a.blockHashes) < len(b.blockHashes) { if len(a.blockHashes) < len(b.blockHashes) {
return -1 return -1
@ -114,8 +123,8 @@ func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) {
} }
} }
// makeConnectionToPeers makes grpc connection to all peers. // MakeConnectionToPeers makes grpc connection to all peers.
func (ss *StateSync) makeConnectionToPeers() { func (ss *StateSync) MakeConnectionToPeers() {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(ss.peerNumber) wg.Add(ss.peerNumber)
@ -139,16 +148,16 @@ func (ss *StateSync) CleanUpNilPeers() {
} }
} }
// getHowMaxConsensus returns max number of consensus nodes and the first ID of consensus group. // GetHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group.
// Assumption: all peers are sorted by compareSyncPeerConfigByBlockHashes first. // Assumption: all peers are sorted by CompareSyncPeerConfigByBlockHashes first.
func (syncConfig *SyncConfig) getHowMaxConsensus() (int, int) { func (syncConfig *SyncConfig) GetHowManyMaxConsensus() (int, int) {
// As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively. // As all peers are sorted by their blockHashes, all equal blockHashes should come together and consecutively.
curCount := 0 curCount := 0
curFirstID := -1 curFirstID := -1
maxCount := 0 maxCount := 0
maxFirstID := -1 maxFirstID := -1
for i := range syncConfig.peers { for i := range syncConfig.peers {
if curFirstID == -1 || compareSyncPeerConfigByBlockHashes(syncConfig.peers[curFirstID], syncConfig.peers[i]) != 0 { if curFirstID == -1 || CompareSyncPeerConfigByblockHashes(syncConfig.peers[curFirstID], syncConfig.peers[i]) != 0 {
curCount = 1 curCount = 1
curFirstID = i curFirstID = i
} else { } else {
@ -162,10 +171,19 @@ func (syncConfig *SyncConfig) getHowMaxConsensus() (int, int) {
return maxFirstID, maxCount return maxFirstID, maxCount
} }
// InitForTesting used for testing.
func (syncConfig *SyncConfig) InitForTesting(client *downloader.Client, blockHashes [][]byte) {
for i := range syncConfig.peers {
syncConfig.peers[i].blockHashes = blockHashes
syncConfig.peers[i].client = client
}
}
// CleanUpPeers cleans up all peers whose blockHashes are not equal to consensus block hashes. // CleanUpPeers cleans up all peers whose blockHashes are not equal to consensus block hashes.
func (syncConfig *SyncConfig) CleanUpPeers(maxFirstID int) { func (syncConfig *SyncConfig) CleanUpPeers(maxFirstID int) {
for i := range syncConfig.peers { fixedPeer := syncConfig.peers[maxFirstID]
if compareSyncPeerConfigByBlockHashes(syncConfig.peers[maxFirstID], syncConfig.peers[i]) != 0 { for i := 0; i < len(syncConfig.peers); i++ {
if CompareSyncPeerConfigByblockHashes(fixedPeer, syncConfig.peers[i]) != 0 {
// TODO: move it into a util delete func. // TODO: move it into a util delete func.
// See tip https://github.com/golang/go/wiki/SliceTricks // See tip https://github.com/golang/go/wiki/SliceTricks
// Close the client and remove the peer out of the // Close the client and remove the peer out of the
@ -177,13 +195,14 @@ func (syncConfig *SyncConfig) CleanUpPeers(maxFirstID int) {
} }
} }
// getBlockHashesConsensusAndCleanUp chesk if all consensus hashes are equal. // GetBlockHashesConsensusAndCleanUp chesk if all consensus hashes are equal.
func (ss *StateSync) getBlockHashesConsensusAndCleanUp() bool { func (ss *StateSync) GetBlockHashesConsensusAndCleanUp() bool {
// Sort all peers by the blockHashes. // Sort all peers by the blockHashes.
sort.Slice(ss.syncConfig.peers, func(i, j int) bool { sort.Slice(ss.syncConfig.peers, func(i, j int) bool {
return compareSyncPeerConfigByBlockHashes(ss.syncConfig.peers[i], ss.syncConfig.peers[j]) == -1 return CompareSyncPeerConfigByblockHashes(ss.syncConfig.peers[i], ss.syncConfig.peers[j]) == -1
}) })
maxCount, maxFirstID := ss.syncConfig.getHowMaxConsensus()
maxFirstID, maxCount := ss.syncConfig.GetHowManyMaxConsensus()
if float64(maxCount) >= ConsensusRatio*float64(ss.activePeerNumber) { if float64(maxCount) >= ConsensusRatio*float64(ss.activePeerNumber) {
ss.syncConfig.CleanUpPeers(maxFirstID) ss.syncConfig.CleanUpPeers(maxFirstID)
ss.CleanUpNilPeers() ss.CleanUpNilPeers()
@ -192,8 +211,8 @@ func (ss *StateSync) getBlockHashesConsensusAndCleanUp() bool {
return false return false
} }
// getConsensusHashes gets all hashes needed to download. // GetConsensusHashes gets all hashes needed to download.
func (ss *StateSync) getConsensusHashes() { func (ss *StateSync) GetConsensusHashes() {
for { for {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(ss.activePeerNumber) wg.Add(ss.activePeerNumber)
@ -209,7 +228,7 @@ func (ss *StateSync) getConsensusHashes() {
}(ss.syncConfig.peers[id]) }(ss.syncConfig.peers[id])
} }
wg.Wait() wg.Wait()
if ss.getBlockHashesConsensusAndCleanUp() { if ss.GetBlockHashesConsensusAndCleanUp() {
break break
} }
} }
@ -272,10 +291,10 @@ func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain)
// Creates sync config. // Creates sync config.
ss.CreateSyncConfig(peers) ss.CreateSyncConfig(peers)
// Makes connections to peers. // Makes connections to peers.
ss.makeConnectionToPeers() ss.MakeConnectionToPeers()
for { for {
// Gets consensus hashes. // Gets consensus hashes.
ss.getConsensusHashes() ss.GetConsensusHashes()
// Generates state-sync task queue. // Generates state-sync task queue.
ss.generateStateSyncTaskQueue(bc) ss.generateStateSyncTaskQueue(bc)

@ -10,6 +10,7 @@ import (
"github.com/harmony-one/harmony/syncing" "github.com/harmony-one/harmony/syncing"
"github.com/harmony-one/harmony/syncing/downloader" "github.com/harmony-one/harmony/syncing/downloader"
pb "github.com/harmony-one/harmony/syncing/downloader/proto" pb "github.com/harmony-one/harmony/syncing/downloader/proto"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc" "google.golang.org/grpc"
) )
@ -69,6 +70,16 @@ func (node *FakeNode) Init(ip, port string) {
node.server = downloader.NewServer(node) node.server = downloader.NewServer(node)
} }
// SetBlockchain is used for testing
func (node *FakeNode) Init2(ip, port string) {
addresses := [][20]byte{TestAddressOne}
node.bc = bc.CreateBlockchainWithMoreBlocks(addresses, ShardID)
node.ip = ip
node.port = port
node.server = downloader.NewServer(node)
}
// Start ... // Start ...
func (node *FakeNode) Start() error { func (node *FakeNode) Start() error {
var err error var err error
@ -100,6 +111,16 @@ func (node *FakeNode) CalculateResponse(request *pb.DownloaderRequest) (*pb.Down
return response, nil return response, nil
} }
func TestCompareSyncPeerConfigByBlockHashes(t *testing.T) {
a := syncing.CreateTestSyncPeerConfig(nil, [][]byte{{1, 2, 3, 4, 5, 6}, {1, 2, 3, 4, 5, 6}})
b := syncing.CreateTestSyncPeerConfig(nil, [][]byte{{1, 2, 3, 4, 5, 6}, {1, 2, 3, 4, 5, 6}})
assert.Equal(t, syncing.CompareSyncPeerConfigByblockHashes(a, b), 0, "they should be equal")
c := syncing.CreateTestSyncPeerConfig(nil, [][]byte{{1, 2, 3, 4, 5, 7}, {1, 2, 3, 4, 5, 6}})
assert.Equal(t, syncing.CompareSyncPeerConfigByblockHashes(a, c), -1, "a should be less than c")
d := syncing.CreateTestSyncPeerConfig(nil, [][]byte{{1, 2, 3, 4, 5, 4}, {1, 2, 3, 4, 5, 6}})
assert.Equal(t, syncing.CompareSyncPeerConfigByblockHashes(a, d), 1, "a should be greater than c")
}
func TestSyncing(t *testing.T) { func TestSyncing(t *testing.T) {
fakeNodes := []*FakeNode{&FakeNode{}, &FakeNode{}, &FakeNode{}} fakeNodes := []*FakeNode{&FakeNode{}, &FakeNode{}, &FakeNode{}}
for i := range fakeNodes { for i := range fakeNodes {
@ -108,6 +129,11 @@ func TestSyncing(t *testing.T) {
t.Error(err) t.Error(err)
} }
} }
defer func() {
for _, fakeNode := range fakeNodes {
fakeNode.grpcServer.Stop()
}
}()
stateSync := &syncing.StateSync{} stateSync := &syncing.StateSync{}
bc := &bc.Blockchain{} bc := &bc.Blockchain{}
@ -125,7 +151,41 @@ func TestSyncing(t *testing.T) {
} }
} }
for _, fakeNode := range fakeNodes { }
fakeNode.grpcServer.Stop()
func TestSyncingIncludingBadNode(t *testing.T) {
fakeNodes := []*FakeNode{&FakeNode{}, &FakeNode{}, &FakeNode{}}
for i := range fakeNodes {
if i == 2 {
// Bad node.
fakeNodes[i].Init2(serverIP, ServerPorts[i])
} else {
// Good node.
fakeNodes[i].Init(serverIP, ServerPorts[i])
}
if err := fakeNodes[i].Start(); err != nil {
t.Error(err)
}
}
defer func() {
for _, fakeNode := range fakeNodes {
fakeNode.grpcServer.Stop()
}
}()
stateSync := &syncing.StateSync{}
bc := &bc.Blockchain{}
peers := make([]p2p.Peer, len(fakeNodes))
for i := range peers {
peers[i].Ip = fakeNodes[i].ip
peers[i].Port = fakeNodes[i].port
}
stateSync.StartStateSync(peers, bc)
for i := range bc.Blocks {
if !reflect.DeepEqual(bc.Blocks[i], fakeNodes[0].bc.Blocks[i]) {
t.Error("not equal")
}
} }
} }

Loading…
Cancel
Save