replace consensus blsAddr with new syncID for state syncing

pull/615/head
chao 6 years ago committed by chaosma
parent af8139f1e3
commit ebd5b8e240
  1. 18
      api/service/syncing/downloader/client.go
  2. 66
      api/service/syncing/downloader/proto/downloader.pb.go
  3. 2
      api/service/syncing/downloader/proto/downloader.proto
  4. 16
      api/service/syncing/syncing.go
  5. 9
      node/node.go
  6. 42
      node/node_syncing.go
  7. 21
      node/node_utils.go

@ -22,7 +22,7 @@ func ClientSetup(ip, port string) *Client {
client := Client{} client := Client{}
client.opts = append(client.opts, grpc.WithInsecure()) client.opts = append(client.opts, grpc.WithInsecure())
var err error var err error
client.conn, err = grpc.Dial(fmt.Sprintf("%s:%s", ip, port), client.opts...) client.conn, err = grpc.Dial(fmt.Sprintf(ip+":"+port), client.opts...)
if err != nil { if err != nil {
utils.GetLogInstance().Info("client.go:ClientSetup fail to dial: ", "error", err) utils.GetLogInstance().Info("client.go:ClientSetup fail to dial: ", "error", err)
return nil return nil
@ -71,29 +71,31 @@ func (client *Client) GetBlocks(hashes [][]byte) *pb.DownloaderResponse {
// Register will register node's ip/port information to peers receive newly created blocks in future // Register will register node's ip/port information to peers receive newly created blocks in future
// hash is the bytes of "ip:port" string representation // hash is the bytes of "ip:port" string representation
func (client *Client) Register(hash []byte) *pb.DownloaderResponse { func (client *Client) Register(hash []byte, ip, port string) *pb.DownloaderResponse {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel() defer cancel()
request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_REGISTER} request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_REGISTER}
request.PeerHash = make([]byte, len(hash)) request.PeerHash = make([]byte, len(hash))
copy(request.PeerHash, hash) copy(request.PeerHash, hash)
request.Ip = ip
request.Port = port
response, err := client.dlClient.Query(ctx, request) response, err := client.dlClient.Query(ctx, request)
if err != nil { if err != nil || response == nil {
utils.GetLogInstance().Info("[SYNC] client.go:Register failed.", "error", err) utils.GetLogInstance().Info("[SYNC] client.go:Register failed.", "error", err, "response", response)
} }
return response return response
} }
// PushNewBlock will send the lastest verified blow to registered nodes // PushNewBlock will send the lastest verified block to registered nodes
func (client *Client) PushNewBlock(peerAddress [20]byte, blockHash []byte, timeout bool) *pb.DownloaderResponse { func (client *Client) PushNewBlock(selfPeerHash [20]byte, blockHash []byte, timeout bool) *pb.DownloaderResponse {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_NEWBLOCK} request := &pb.DownloaderRequest{Type: pb.DownloaderRequest_NEWBLOCK}
request.BlockHash = make([]byte, len(blockHash)) request.BlockHash = make([]byte, len(blockHash))
copy(request.BlockHash, blockHash) copy(request.BlockHash, blockHash)
request.PeerHash = make([]byte, len(peerAddress)) request.PeerHash = make([]byte, len(selfPeerHash))
copy(request.PeerHash, peerAddress[:]) copy(request.PeerHash, selfPeerHash[:])
if timeout { if timeout {
request.Type = pb.DownloaderRequest_REGISTERTIMEOUT request.Type = pb.DownloaderRequest_REGISTERTIMEOUT

@ -98,6 +98,8 @@ type DownloaderRequest struct {
Hashes [][]byte `protobuf:"bytes,2,rep,name=hashes,proto3" json:"hashes,omitempty"` Hashes [][]byte `protobuf:"bytes,2,rep,name=hashes,proto3" json:"hashes,omitempty"`
PeerHash []byte `protobuf:"bytes,3,opt,name=peerHash,proto3" json:"peerHash,omitempty"` PeerHash []byte `protobuf:"bytes,3,opt,name=peerHash,proto3" json:"peerHash,omitempty"`
BlockHash []byte `protobuf:"bytes,4,opt,name=blockHash,proto3" json:"blockHash,omitempty"` BlockHash []byte `protobuf:"bytes,4,opt,name=blockHash,proto3" json:"blockHash,omitempty"`
Ip string `protobuf:"bytes,5,opt,name=ip,proto3" json:"ip,omitempty"`
Port string `protobuf:"bytes,6,opt,name=port,proto3" json:"port,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -156,6 +158,20 @@ func (m *DownloaderRequest) GetBlockHash() []byte {
return nil return nil
} }
func (m *DownloaderRequest) GetIp() string {
if m != nil {
return m.Ip
}
return ""
}
func (m *DownloaderRequest) GetPort() string {
if m != nil {
return m.Port
}
return ""
}
// DownloaderResponse is the generic response of DownloaderRequest. // DownloaderResponse is the generic response of DownloaderRequest.
type DownloaderResponse struct { type DownloaderResponse struct {
// payload of Block. // payload of Block.
@ -224,30 +240,32 @@ func init() {
func init() { proto.RegisterFile("downloader.proto", fileDescriptor_6a99ec95c7ab1ff1) } func init() { proto.RegisterFile("downloader.proto", fileDescriptor_6a99ec95c7ab1ff1) }
var fileDescriptor_6a99ec95c7ab1ff1 = []byte{ var fileDescriptor_6a99ec95c7ab1ff1 = []byte{
// 364 bytes of a gzipped FileDescriptorProto // 388 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x92, 0xcf, 0xae, 0x9a, 0x40, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x92, 0xdf, 0x6f, 0x94, 0x40,
0x14, 0xc6, 0x05, 0x91, 0xeb, 0x3d, 0x98, 0x3a, 0x3d, 0x6d, 0x1a, 0x62, 0xda, 0x86, 0xb0, 0xb2, 0x10, 0xc7, 0x6f, 0x39, 0xa0, 0x77, 0xc3, 0xa5, 0x5d, 0x47, 0x63, 0x48, 0xa3, 0x86, 0xf0, 0x84,
0x1b, 0x16, 0xba, 0xea, 0xa2, 0x0b, 0x8b, 0x53, 0x21, 0x5a, 0x4c, 0x07, 0xac, 0xe9, 0x12, 0xeb, 0x2f, 0x3c, 0xb4, 0x4f, 0x3e, 0xf8, 0x50, 0xe9, 0x7a, 0x90, 0x56, 0x2e, 0x2e, 0x9c, 0x8d, 0x8f,
0x44, 0x4c, 0x8d, 0x50, 0x06, 0xd3, 0xf0, 0x28, 0x7d, 0xaf, 0x3e, 0x50, 0xc3, 0xf8, 0x07, 0x92, 0xd4, 0x6e, 0x0a, 0xb1, 0x29, 0x2b, 0x4b, 0x63, 0xf8, 0xe3, 0xfc, 0x2f, 0xfc, 0x83, 0x0c, 0x7b,
0xf6, 0xba, 0x82, 0xef, 0x3b, 0x73, 0xce, 0xcc, 0xf7, 0x9b, 0x01, 0xb2, 0x4d, 0x7f, 0x1d, 0x0f, 0x3f, 0x20, 0x51, 0xef, 0x69, 0xe7, 0xfb, 0x9d, 0x9d, 0xc9, 0xcc, 0x27, 0x03, 0xf4, 0xae, 0xfe,
0x69, 0xbc, 0xe5, 0xb9, 0x93, 0xe5, 0x69, 0x91, 0x22, 0xd4, 0x8e, 0xfd, 0x5b, 0x85, 0xe7, 0xd3, 0xf9, 0xf8, 0x50, 0x17, 0x77, 0xa2, 0x09, 0x65, 0x53, 0xb7, 0x35, 0xc2, 0xe0, 0xf8, 0xbf, 0x0c,
0x9b, 0x64, 0xfc, 0xe7, 0x89, 0x8b, 0x02, 0x3f, 0x80, 0x56, 0x94, 0x19, 0x37, 0x15, 0x4b, 0x19, 0x78, 0x76, 0xb9, 0x97, 0x5c, 0xfc, 0x78, 0x12, 0xaa, 0xc5, 0xf7, 0x60, 0xb6, 0x9d, 0x14, 0x2e,
0x3e, 0x1b, 0xbd, 0x73, 0x1a, 0x23, 0xfe, 0x59, 0xec, 0x5c, 0xbe, 0x51, 0x99, 0x71, 0x26, 0xdb, 0xf1, 0x48, 0x70, 0x7c, 0xf6, 0x36, 0x1c, 0xb5, 0xf8, 0xeb, 0x73, 0xb8, 0x7d, 0xf3, 0x4e, 0x0a,
0xf0, 0x15, 0xe8, 0x49, 0x2c, 0x12, 0x2e, 0x4c, 0xd5, 0x6a, 0x0f, 0x7b, 0xec, 0xa2, 0x70, 0x00, 0xae, 0xcb, 0xf0, 0x25, 0xd8, 0x65, 0xa1, 0x4a, 0xa1, 0x5c, 0xc3, 0x9b, 0x06, 0x0b, 0xbe, 0x55,
0xdd, 0x8c, 0xf3, 0xdc, 0x8b, 0x45, 0x62, 0xb6, 0x2d, 0x65, 0xd8, 0x63, 0x37, 0x8d, 0xaf, 0xe1, 0x78, 0x0a, 0x33, 0x29, 0x44, 0x13, 0x17, 0xaa, 0x74, 0xa7, 0x1e, 0x09, 0x16, 0x7c, 0xaf, 0xf1,
0x71, 0x73, 0x48, 0xbf, 0xff, 0x90, 0x45, 0x4d, 0x16, 0x6b, 0xc3, 0x16, 0x60, 0x34, 0xb6, 0x41, 0x15, 0xcc, 0x6f, 0x1f, 0xea, 0x6f, 0xdf, 0x75, 0xd2, 0xd4, 0xc9, 0xc1, 0xc0, 0x63, 0x30, 0x2a,
0x00, 0xdd, 0xa3, 0x93, 0x29, 0x65, 0xa4, 0x85, 0x8f, 0xd0, 0xf9, 0xb8, 0x58, 0xba, 0x73, 0xa2, 0xe9, 0x5a, 0x1e, 0x09, 0xe6, 0xdc, 0xa8, 0x24, 0x22, 0x98, 0xb2, 0x6e, 0x5a, 0xd7, 0xd6, 0x8e,
0x60, 0x0f, 0xba, 0x01, 0x5d, 0x9f, 0x95, 0x8a, 0x7d, 0x30, 0xe4, 0xaf, 0x47, 0xfd, 0x99, 0x17, 0x8e, 0x7d, 0x05, 0xce, 0x68, 0x14, 0x04, 0xb0, 0x63, 0x76, 0x71, 0xc9, 0x38, 0x9d, 0xe0, 0x1c,
0x91, 0x76, 0x55, 0x66, 0x74, 0xe6, 0x87, 0x11, 0x65, 0x44, 0xc3, 0x17, 0xd0, 0xbf, 0xaa, 0xc8, 0xac, 0x0f, 0xd7, 0xab, 0xe8, 0x8a, 0x12, 0x5c, 0xc0, 0x2c, 0x65, 0x37, 0x1b, 0x65, 0xe0, 0x09,
0xff, 0x4c, 0x97, 0xab, 0x88, 0x74, 0xd0, 0x80, 0x87, 0x55, 0x30, 0x0f, 0x96, 0xeb, 0x80, 0xe8, 0x38, 0x3a, 0x8c, 0x59, 0xb2, 0x8c, 0x73, 0x3a, 0xed, 0xd3, 0x9c, 0x2d, 0x93, 0x2c, 0x67, 0x9c,
0xf6, 0x1f, 0x05, 0xb0, 0x19, 0x57, 0x64, 0xe9, 0x51, 0x70, 0x34, 0xe1, 0x21, 0x8b, 0xcb, 0xca, 0x9a, 0xf8, 0x1c, 0x4e, 0x76, 0x2a, 0x4f, 0x3e, 0xb1, 0xd5, 0x3a, 0xa7, 0x16, 0x3a, 0x70, 0xb4,
0x34, 0x15, 0x19, 0xef, 0x2a, 0x71, 0x76, 0xc1, 0xa6, 0x4a, 0x6c, 0xe3, 0xa7, 0xb0, 0x9d, 0xe7, 0x4e, 0xaf, 0xd2, 0xd5, 0x4d, 0x4a, 0x6d, 0xff, 0x37, 0x01, 0x1c, 0x23, 0x51, 0xb2, 0x7e, 0x54,
0x38, 0x8c, 0xef, 0xf6, 0xa2, 0xa8, 0x8d, 0x06, 0x40, 0x0b, 0x8c, 0x73, 0x76, 0xbe, 0xdf, 0x25, 0x02, 0x5d, 0x38, 0x92, 0x45, 0xd7, 0x9b, 0x2e, 0xd1, 0x08, 0x76, 0x12, 0x97, 0x5b, 0xb4, 0x86,
0x85, 0x64, 0xa5, 0xb1, 0xa6, 0x65, 0xbf, 0x87, 0x97, 0xff, 0xeb, 0xaf, 0x02, 0x84, 0x2b, 0xd7, 0x46, 0x7b, 0xfe, 0x3f, 0xb4, 0x9b, 0x3e, 0x21, 0x17, 0xf7, 0x95, 0x6a, 0x07, 0x63, 0x04, 0xd9,
0xa5, 0x61, 0x48, 0x5a, 0xd8, 0x05, 0xed, 0xd3, 0xc4, 0x5f, 0x10, 0xa5, 0x02, 0xe6, 0x07, 0xe1, 0x03, 0x67, 0xc3, 0x47, 0x54, 0xf7, 0x65, 0xab, 0x79, 0x9a, 0x7c, 0x6c, 0xf9, 0xef, 0xe0, 0xc5,
0xb7, 0xc0, 0x25, 0xea, 0xe8, 0x2b, 0x40, 0x7d, 0x1a, 0xf4, 0xa0, 0xf3, 0xe5, 0xc4, 0xf3, 0x12, 0xbf, 0xea, 0xfb, 0x05, 0xb2, 0x75, 0x14, 0xb1, 0x2c, 0xa3, 0x13, 0x9c, 0x81, 0xf9, 0xf1, 0x22,
0xdf, 0xdc, 0xbd, 0xe5, 0xc1, 0xdb, 0xfb, 0x69, 0xec, 0xd6, 0x46, 0x97, 0xaf, 0x6b, 0xfc, 0x37, 0xb9, 0xa6, 0xa4, 0x07, 0x96, 0xa4, 0xd9, 0xd7, 0x34, 0xa2, 0xc6, 0xd9, 0x17, 0x80, 0x61, 0x1a,
0x00, 0x00, 0xff, 0xff, 0x60, 0xac, 0xf6, 0x1e, 0x71, 0x02, 0x00, 0x00, 0x8c, 0xc1, 0xfa, 0xfc, 0x24, 0x9a, 0x0e, 0x5f, 0x1f, 0xbc, 0x84, 0xd3, 0x37, 0x87, 0xb7, 0xf1,
0x27, 0xb7, 0xb6, 0xbe, 0xc0, 0xf3, 0x3f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x45, 0x42, 0x00, 0x51,
0x95, 0x02, 0x00, 0x00,
} }
// Reference imports to suppress errors if they are not otherwise used. // Reference imports to suppress errors if they are not otherwise used.

@ -26,6 +26,8 @@ message DownloaderRequest {
repeated bytes hashes = 2; repeated bytes hashes = 2;
bytes peerHash = 3; bytes peerHash = 3;
bytes blockHash = 4; bytes blockHash = 4;
string ip = 5;
string port = 6;
} }
// DownloaderResponse is the generic response of DownloaderRequest. // DownloaderResponse is the generic response of DownloaderRequest.

@ -57,11 +57,11 @@ type SyncConfig struct {
} }
// CreateStateSync returns the implementation of StateSyncInterface interface. // CreateStateSync returns the implementation of StateSyncInterface interface.
func CreateStateSync(ip string, port string, addr [20]byte) *StateSync { func CreateStateSync(ip string, port string, peerHash [20]byte) *StateSync {
stateSync := &StateSync{} stateSync := &StateSync{}
stateSync.selfip = ip stateSync.selfip = ip
stateSync.selfport = port stateSync.selfport = port
stateSync.selfAddress = addr stateSync.selfPeerHash = peerHash
stateSync.commonBlocks = make(map[int]*types.Block) stateSync.commonBlocks = make(map[int]*types.Block)
stateSync.lastMileBlocks = []*types.Block{} stateSync.lastMileBlocks = []*types.Block{}
return stateSync return stateSync
@ -71,10 +71,10 @@ func CreateStateSync(ip string, port string, addr [20]byte) *StateSync {
type StateSync struct { type StateSync struct {
selfip string selfip string
selfport string selfport string
selfPeerHash [20]byte // hash of ip and address combination
peerNumber int peerNumber int
activePeerNumber int activePeerNumber int
currentHeight uint64 // current height of local blockchain currentHeight uint64 // current height of local blockchain
selfAddress [20]byte // address of my BLS key
commonBlocks map[int]*types.Block commonBlocks map[int]*types.Block
lastMileBlocks []*types.Block // last mile blocks to catch up with the consensus lastMileBlocks []*types.Block // last mile blocks to catch up with the consensus
syncConfig *SyncConfig syncConfig *SyncConfig
@ -516,8 +516,8 @@ func (ss *StateSync) ProcessStateSync(startHash []byte, bc *core.BlockChain, wor
ss.generateNewState(bc, worker) ss.generateNewState(bc, worker)
} }
func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte) error { func (peerConfig *SyncPeerConfig) registerToBroadcast(peerHash []byte, ip, port string) error {
response := peerConfig.client.Register(peerHash) response := peerConfig.client.Register(peerHash, ip, port)
if response == nil || response.Type == pb.DownloaderResponse_FAIL { if response == nil || response.Type == pb.DownloaderResponse_FAIL {
return ErrRegistrationFail return ErrRegistrationFail
} else if response.Type == pb.DownloaderResponse_SUCCESS { } else if response.Type == pb.DownloaderResponse_SUCCESS {
@ -542,9 +542,9 @@ func (ss *StateSync) RegisterNodeInfo() int {
if peerConfig.client == nil { if peerConfig.client == nil {
continue continue
} }
err := peerConfig.registerToBroadcast(ss.selfAddress[:]) err := peerConfig.registerToBroadcast(ss.selfPeerHash[:], ss.selfip, ss.selfport)
if err != nil { if err != nil {
utils.GetLogInstance().Debug("[SYNC] register failed to peer", "ip", peerConfig.ip, "port", peerConfig.port, "selfAddress", ss.selfAddress) utils.GetLogInstance().Debug("[SYNC] register failed to peer", "ip", peerConfig.ip, "port", peerConfig.port, "selfPeerHash", ss.selfPeerHash)
continue continue
} }
utils.GetLogInstance().Debug("[SYNC] register success", "ip", peerConfig.ip, "port", peerConfig.port) utils.GetLogInstance().Debug("[SYNC] register success", "ip", peerConfig.ip, "port", peerConfig.port)

@ -70,6 +70,8 @@ const (
ClientServicePortDiff = 5555 ClientServicePortDiff = 5555
maxBroadcastNodes = 10 // broadcast at most maxBroadcastNodes peers that need in sync maxBroadcastNodes = 10 // broadcast at most maxBroadcastNodes peers that need in sync
broadcastTimeout int64 = 3 * 60 * 1000000000 // 3 mins broadcastTimeout int64 = 3 * 60 * 1000000000 // 3 mins
//SyncIDLength is the length of bytes for syncID
SyncIDLength = 20
) )
// use to push new block to outofsync node // use to push new block to outofsync node
@ -114,6 +116,7 @@ type Node struct {
clientServer *clientService.Server clientServer *clientService.Server
// Syncing component. // Syncing component.
syncID [SyncIDLength]byte // a unique ID for the node during the state syncing process with peers
downloaderServer *downloader.Server downloaderServer *downloader.Server
stateSync *syncing.StateSync stateSync *syncing.StateSync
beaconSync *syncing.StateSync beaconSync *syncing.StateSync
@ -209,6 +212,11 @@ func (node *Node) countNumTransactionsInBlockchain() int {
return count return count
} }
// GetSyncID returns the syncID of this node
func (node *Node) GetSyncID() [SyncIDLength]byte {
return node.syncID
}
// New creates a new node. // New creates a new node.
func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database) *Node { func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database) *Node {
var chain *core.BlockChain var chain *core.BlockChain
@ -216,6 +224,7 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database) *N
var isFirstTime bool // if cannot get blockchain from database, then isFirstTime = true var isFirstTime bool // if cannot get blockchain from database, then isFirstTime = true
node := Node{} node := Node{}
copy(node.syncID[:], GenerateRandomString(SyncIDLength))
if host != nil { if host != nil {
node.host = host node.host = host
node.SelfPeer = host.GetSelfPeer() node.SelfPeer = host.GetSelfPeer()

@ -16,7 +16,6 @@ import (
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/internal/utils" "github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
peer "github.com/libp2p/go-libp2p-peer"
) )
// Constants related to doing syncing. // Constants related to doing syncing.
@ -33,13 +32,6 @@ func GetSyncingPort(nodePort string) string {
return "" return ""
} }
// (TODO) temporary, remove it later
func getPeerFromIPandPort(ip, port string) p2p.Peer {
priKey, _, _ := utils.GenKeyP2P(ip, port)
peerID, _ := peer.IDFromPrivateKey(priKey)
return p2p.Peer{IP: ip, Port: port, PeerID: peerID}
}
// getNeighborPeers is a helper function to return list of peers // getNeighborPeers is a helper function to return list of peers
// based on different neightbor map // based on different neightbor map
func (node *Node) getNeighborPeers(neighbor *sync.Map) []p2p.Peer { func (node *Node) getNeighborPeers(neighbor *sync.Map) []p2p.Peer {
@ -75,7 +67,7 @@ func (node *Node) DoBeaconSyncing() {
select { select {
case beaconBlock := <-node.BeaconBlockChannel: case beaconBlock := <-node.BeaconBlockChannel:
if node.beaconSync == nil { if node.beaconSync == nil {
node.beaconSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.Consensus.PubKey.GetAddress()) node.beaconSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID())
node.beaconSync.CreateSyncConfig(node.GetBeaconSyncingPeers()) node.beaconSync.CreateSyncConfig(node.GetBeaconSyncingPeers())
node.beaconSync.MakeConnectionToPeers() node.beaconSync.MakeConnectionToPeers()
} }
@ -110,7 +102,7 @@ func (node *Node) IsOutOfSync(consensusBlockInfo *consensus.BFTBlockInfo) bool {
// DoSync syncs with peers until catchup, this function is not coupled with consensus // DoSync syncs with peers until catchup, this function is not coupled with consensus
func (node *Node) DoSync() { func (node *Node) DoSync() {
<-node.peerReadyChan <-node.peerReadyChan
ss := syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.Consensus.PubKey.GetAddress()) ss := syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID())
if ss.CreateSyncConfig(node.GetSyncingPeers()) { if ss.CreateSyncConfig(node.GetSyncingPeers()) {
ss.MakeConnectionToPeers() ss.MakeConnectionToPeers()
ss.SyncLoop(node.blockchain, node.Worker) ss.SyncLoop(node.blockchain, node.Worker)
@ -150,7 +142,7 @@ func (node *Node) DoSyncing() {
} }
if node.stateSync == nil { if node.stateSync == nil {
node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.Consensus.PubKey.GetAddress()) node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID())
node.stateSync.CreateSyncConfig(node.GetSyncingPeers()) node.stateSync.CreateSyncConfig(node.GetSyncingPeers())
node.stateSync.MakeConnectionToPeers() node.stateSync.MakeConnectionToPeers()
} }
@ -201,14 +193,14 @@ func (node *Node) SendNewBlockToUnsync() {
if elapseTime > broadcastTimeout { if elapseTime > broadcastTimeout {
utils.GetLogInstance().Warn("[SYNC] SendNewBlockToUnsync to peer timeout", "peerID", peerID) utils.GetLogInstance().Warn("[SYNC] SendNewBlockToUnsync to peer timeout", "peerID", peerID)
// send last time and delete // send last time and delete
config.client.PushNewBlock(node.Consensus.PubKey.GetAddress(), blockHash, true) config.client.PushNewBlock(node.GetSyncID(), blockHash, true)
node.stateMutex.Lock() node.stateMutex.Lock()
node.peerRegistrationRecord[peerID].client.Close() node.peerRegistrationRecord[peerID].client.Close()
delete(node.peerRegistrationRecord, peerID) delete(node.peerRegistrationRecord, peerID)
node.stateMutex.Unlock() node.stateMutex.Unlock()
continue continue
} }
response := config.client.PushNewBlock(node.Consensus.PubKey.GetAddress(), blockHash, false) response := config.client.PushNewBlock(node.GetSyncID(), blockHash, false)
if response != nil && response.Type == downloader_pb.DownloaderResponse_INSYNC { if response != nil && response.Type == downloader_pb.DownloaderResponse_INSYNC {
node.stateMutex.Lock() node.stateMutex.Lock()
node.peerRegistrationRecord[peerID].client.Close() node.peerRegistrationRecord[peerID].client.Close()
@ -270,32 +262,30 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (*
node.stateSync.AddNewBlock(request.PeerHash, &blockObj) node.stateSync.AddNewBlock(request.PeerHash, &blockObj)
case downloader_pb.DownloaderRequest_REGISTER: case downloader_pb.DownloaderRequest_REGISTER:
peerAddress := common.BytesToAddress(request.PeerHash[:]).Hex() peerID := string(request.PeerHash[:])
if _, ok := node.peerRegistrationRecord[peerAddress]; ok { ip := request.Ip
port := request.Port
if _, ok := node.peerRegistrationRecord[peerID]; ok {
response.Type = downloader_pb.DownloaderResponse_FAIL response.Type = downloader_pb.DownloaderResponse_FAIL
utils.GetLogInstance().Warn("[SYNC] peerRegistration record already exists", "peerAddress", peerAddress) utils.GetLogInstance().Warn("[SYNC] peerRegistration record already exists", "ip", ip, "port", port)
return response, nil return response, nil
} else if len(node.peerRegistrationRecord) >= maxBroadcastNodes { } else if len(node.peerRegistrationRecord) >= maxBroadcastNodes {
response.Type = downloader_pb.DownloaderResponse_FAIL response.Type = downloader_pb.DownloaderResponse_FAIL
utils.GetLogInstance().Warn("[SYNC] maximum registration limit exceeds", "peerAddress", peerAddress) utils.GetLogInstance().Warn("[SYNC] maximum registration limit exceeds", "ip", ip, "port", port)
return response, nil return response, nil
} else { } else {
peer := node.Consensus.GetPeerByAddress(peerAddress)
response.Type = downloader_pb.DownloaderResponse_FAIL response.Type = downloader_pb.DownloaderResponse_FAIL
if peer == nil { syncPort := GetSyncingPort(port)
utils.GetLogInstance().Warn("[SYNC] unable to get peer from peerID", "peerAddress", peerAddress) client := downloader.ClientSetup(ip, syncPort)
return response, nil
}
client := downloader.ClientSetup(peer.IP, GetSyncingPort(peer.Port))
if client == nil { if client == nil {
utils.GetLogInstance().Warn("[SYNC] unable to setup client for peerID", "peerAddress", peerAddress) utils.GetLogInstance().Warn("[SYNC] unable to setup client for peerID", "ip", ip, "port", port)
return response, nil return response, nil
} }
config := &syncConfig{timestamp: time.Now().UnixNano(), client: client} config := &syncConfig{timestamp: time.Now().UnixNano(), client: client}
node.stateMutex.Lock() node.stateMutex.Lock()
node.peerRegistrationRecord[peerAddress] = config node.peerRegistrationRecord[peerID] = config
node.stateMutex.Unlock() node.stateMutex.Unlock()
utils.GetLogInstance().Debug("[SYNC] register peerID success", "peerAddress", peerAddress) utils.GetLogInstance().Debug("[SYNC] register peerID success", "ip", ip, "port", port)
response.Type = downloader_pb.DownloaderResponse_SUCCESS response.Type = downloader_pb.DownloaderResponse_SUCCESS
} }

@ -0,0 +1,21 @@
package node
import (
"math/rand"
"time"
)
var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
func init() {
rand.Seed(time.Now().UnixNano())
}
// GenerateRandomString generates a random string with given length
func GenerateRandomString(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}
Loading…
Cancel
Save