@ -86,8 +86,6 @@ type StateSync struct {
selfip string
selfip string
selfport string
selfport string
selfPeerHash [ 20 ] byte // hash of ip and address combination
selfPeerHash [ 20 ] byte // hash of ip and address combination
peerNumber int
activePeerNumber int
commonBlocks map [ int ] * types . Block
commonBlocks map [ int ] * types . Block
lastMileBlocks [ ] * types . Block // last mile blocks to catch up with the consensus
lastMileBlocks [ ] * types . Block // last mile blocks to catch up with the consensus
syncConfig * SyncConfig
syncConfig * SyncConfig
@ -166,11 +164,10 @@ func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) error {
if len ( peers ) == 0 {
if len ( peers ) == 0 {
return ctxerror . New ( "[SYNC] no peers to connect to" )
return ctxerror . New ( "[SYNC] no peers to connect to" )
}
}
ss . peerNumber = len ( peers )
ss . syncConfig = & SyncConfig { }
ss . syncConfig = & SyncConfig { }
var wg sync . WaitGroup
var wg sync . WaitGroup
wg . Add ( ss . peerNumber )
for _ , peer := range peers {
for _ , peer := range peers {
wg . Add ( 1 )
go func ( peer p2p . Peer ) {
go func ( peer p2p . Peer ) {
defer wg . Done ( )
defer wg . Done ( )
client := downloader . ClientSetup ( peer . IP , peer . Port )
client := downloader . ClientSetup ( peer . IP , peer . Port )
@ -186,7 +183,6 @@ func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) error {
} ( peer )
} ( peer )
}
}
wg . Wait ( )
wg . Wait ( )
ss . CleanUpNilPeers ( )
utils . GetLogInstance ( ) . Info ( "[SYNC] Finished making connection to peers." )
utils . GetLogInstance ( ) . Info ( "[SYNC] Finished making connection to peers." )
return nil
return nil
@ -194,22 +190,10 @@ func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) error {
// GetActivePeerNumber returns the number of active peers
// GetActivePeerNumber returns the number of active peers
func ( ss * StateSync ) GetActivePeerNumber ( ) int {
func ( ss * StateSync ) GetActivePeerNumber ( ) int {
if ss . syncConfig == nil || len ( ss . syncConfig . peers ) == 0 {
if ss . syncConfig == nil {
return 0
return 0
}
}
ss . CleanUpNilPeers ( )
return len ( ss . syncConfig . peers )
return ss . activePeerNumber
}
// CleanUpNilPeers cleans up peer with nil client and recalculate activePeerNumber.
func ( ss * StateSync ) CleanUpNilPeers ( ) {
ss . activePeerNumber = 0
for _ , configPeer := range ss . syncConfig . peers {
if configPeer . client != nil {
ss . activePeerNumber ++
}
}
utils . GetLogInstance ( ) . Info ( "[SYNC] clean up inactive peers" , "activeNumber" , ss . activePeerNumber )
}
}
// GetHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group.
// GetHowManyMaxConsensus returns max number of consensus nodes and the first ID of consensus group.
@ -267,9 +251,8 @@ func (ss *StateSync) GetBlockHashesConsensusAndCleanUp() bool {
} )
} )
maxFirstID , maxCount := ss . syncConfig . GetHowManyMaxConsensus ( )
maxFirstID , maxCount := ss . syncConfig . GetHowManyMaxConsensus ( )
utils . GetLogInstance ( ) . Info ( "[SYNC] block consensus hashes" , "maxFirstID" , maxFirstID , "maxCount" , maxCount )
utils . GetLogInstance ( ) . Info ( "[SYNC] block consensus hashes" , "maxFirstID" , maxFirstID , "maxCount" , maxCount )
if float64 ( maxCount ) >= ConsensusRatio * float64 ( ss . activePeerNumber ) {
if float64 ( maxCount ) >= ConsensusRatio * float64 ( len ( ss . syncConfig . peers ) ) {
ss . syncConfig . CleanUpPeers ( maxFirstID )
ss . syncConfig . CleanUpPeers ( maxFirstID )
ss . CleanUpNilPeers ( )
return true
return true
}
}
return false
return false
@ -326,7 +309,7 @@ func (ss *StateSync) generateStateSyncTaskQueue(bc *core.BlockChain) {
func ( ss * StateSync ) downloadBlocks ( bc * core . BlockChain ) {
func ( ss * StateSync ) downloadBlocks ( bc * core . BlockChain ) {
// Initialize blockchain
// Initialize blockchain
var wg sync . WaitGroup
var wg sync . WaitGroup
wg . Add ( ss . activePeerNumber )
wg . Add ( len ( ss . syncConfig . peers ) )
count := 0
count := 0
for i := range ss . syncConfig . peers {
for i := range ss . syncConfig . peers {
if ss . syncConfig . peers [ i ] . client == nil {
if ss . syncConfig . peers [ i ] . client == nil {
@ -546,9 +529,10 @@ func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port
// RegisterNodeInfo will register node to peers to accept future new block broadcasting
// RegisterNodeInfo will register node to peers to accept future new block broadcasting
// return number of successfull registration
// return number of successfull registration
func ( ss * StateSync ) RegisterNodeInfo ( ) int {
func ( ss * StateSync ) RegisterNodeInfo ( ) int {
ss . CleanUpNilPeers ( )
registrationNumber := RegistrationNumber
registrationNumber := RegistrationNumber
utils . GetLogInstance ( ) . Debug ( "[SYNC] node registration to peers" , "registrationNumber" , registrationNumber , "activePeerNumber" , ss . activePeerNumber )
utils . GetLogInstance ( ) . Debug ( "[SYNC] node registration to peers" ,
"registrationNumber" , registrationNumber ,
"activePeerNumber" , len ( ss . syncConfig . peers ) )
count := 0
count := 0
for id := range ss . syncConfig . peers {
for id := range ss . syncConfig . peers {
@ -576,7 +560,6 @@ func (ss *StateSync) RegisterNodeInfo() int {
// getMaxPeerHeight gets the maximum blockchain heights from peers
// getMaxPeerHeight gets the maximum blockchain heights from peers
func ( ss * StateSync ) getMaxPeerHeight ( ) uint64 {
func ( ss * StateSync ) getMaxPeerHeight ( ) uint64 {
ss . CleanUpNilPeers ( )
maxHeight := uint64 ( 0 )
maxHeight := uint64 ( 0 )
var wg sync . WaitGroup
var wg sync . WaitGroup
for id := range ss . syncConfig . peers {
for id := range ss . syncConfig . peers {