Merge branch 'master' of github.com:harmony-one/harmony into rj_branch

pull/99/head
Rongjian Lan 6 years ago
commit c40fe757e1
  1. 13
      blockchain/blockchain.go
  2. 47
      node/node.go
  3. 33
      syncing/syncing.go
  4. 34
      syncing/syncing_test.go

@ -233,16 +233,19 @@ func CreateBlockchain(address [20]byte, shardID uint32) *Blockchain {
return &bc
}
// CreateBlockchainWithMoreBlocks ...
// CreateBlockchainWithMoreBlocks is used for syncing testing.
func CreateBlockchainWithMoreBlocks(addresses [][20]byte, shardID uint32) *Blockchain {
blocks := make([]*Block, 0)
return &Blockchain{CreateMoreBlocks(addresses, shardID)}
}
// CreateMoreBlocks is used for syncing testing.
func CreateMoreBlocks(addresses [][20]byte, shardID uint32) []*Block {
blocks := []*Block{}
for _, address := range addresses {
cbtx := NewCoinbaseTX(address, genesisCoinbaseData, shardID)
blocks = append(blocks, NewGenesisBlock(cbtx, shardID))
}
bc := Blockchain{blocks}
return &bc
return blocks
}
// CreateStateBlock creates state block based on the utxos.

@ -14,29 +14,32 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/blockchain"
"github.com/harmony-one/harmony/client"
bft "github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm"
"github.com/harmony-one/harmony/crypto/pki"
hdb "github.com/harmony-one/harmony/db"
"github.com/harmony-one/harmony/log"
"github.com/harmony-one/harmony/node/worker"
"github.com/harmony-one/harmony/p2p"
proto_identity "github.com/harmony-one/harmony/proto/identity"
proto_node "github.com/harmony-one/harmony/proto/node"
"github.com/harmony-one/harmony/syncing/downloader"
downloader_pb "github.com/harmony-one/harmony/syncing/downloader/proto"
)
type NodeState byte
const (
NodeInit NodeState = iota // Node just started, before contacting BeaconChain
NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard
NodeJoinedShard // Node joined Shard, ready for consensus
NodeOffline // Node is offline
NodeInit NodeState = iota // Node just started, before contacting BeaconChain
NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard
NodeJoinedShard // Node joined Shard, ready for consensus
NodeOffline // Node is offline
NodeReadyForConsensus // Node is ready to do consensus
NodeDoingConsensus // Node is already doing consensus
)
type NetworkNode struct {
@ -78,6 +81,9 @@ type Node struct {
BlockChannelAccount chan *types.Block // The channel to receive new blocks from Node
Worker *worker.Worker
// Syncing component.
downloaderServer *downloader.Server
// Test only
TestBankKeys []*ecdsa.PrivateKey
}
@ -315,6 +321,7 @@ func (node *Node) AddPeers(peers []p2p.Peer) int {
return count
}
// JoinShard helps a new node to join a shard.
func (node *Node) JoinShard(leader p2p.Peer) {
// try to join the shard, with 10 minutes time-out
backoff := p2p.NewExpBackoff(1*time.Second, 10*time.Minute, 2)
@ -328,3 +335,25 @@ func (node *Node) JoinShard(leader p2p.Peer) {
node.log.Debug("Sent ping message")
}
}
// StartDownloaderServer starts downloader server.
func (node *Node) StartDownloaderServer() {
node.downloaderServer = downloader.NewServer(node)
// node.downloaderServer.Start(node.)
}
// CalculateResponse implements DownloadInterface on Node object.
func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest) (*downloader_pb.DownloaderResponse, error) {
response := &downloader_pb.DownloaderResponse{}
if request.Type == downloader_pb.DownloaderRequest_HEADER {
for _, block := range node.blockchain.Blocks {
response.Payload = append(response.Payload, block.Hash[:])
}
} else {
for i := range request.Hashes {
block := node.blockchain.FindBlock(request.Hashes[i])
response.Payload = append(response.Payload, block.Serialize())
}
}
return response, nil
}

@ -140,7 +140,8 @@ func (ss *StateSync) getConsensusHashes() {
}
go func(peerConfig *SyncPeerConfig) {
defer wg.Done()
peerConfig.client.GetBlockHashes()
response := peerConfig.client.GetBlockHashes()
peerConfig.blockHashes = response.Payload
}(&ss.syncConfig.peers[id])
}
wg.Wait()
@ -151,14 +152,18 @@ func (ss *StateSync) getConsensusHashes() {
}
// getConsensusHashes gets all hashes needed to download.
func (ss *StateSync) generateStateSyncTaskQueue() {
func (ss *StateSync) generateStateSyncTaskQueue(bc *blockchain.Blockchain) {
ss.stateSyncTaskQueue = queue.New(0)
for _, configPeer := range ss.syncConfig.peers {
if configPeer.client != nil {
ss.blockHeight = len(configPeer.blockHashes)
bc.Blocks = append(bc.Blocks, make([]*blockchain.Block, ss.blockHeight-len(bc.Blocks))...)
for id, blockHash := range configPeer.blockHashes {
ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash})
if bc.Blocks[id] == nil || !reflect.DeepEqual(bc.Blocks[id].Hash[:], blockHash) {
ss.stateSyncTaskQueue.Put(SyncBlockTask{index: id, blockHash: blockHash})
}
}
ss.blockHeight = len(configPeer.blockHashes)
break
}
}
@ -167,7 +172,6 @@ func (ss *StateSync) generateStateSyncTaskQueue() {
// downloadBlocks downloads blocks from state sync task queue.
func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) {
// Initialize blockchain
bc.Blocks = make([]*blockchain.Block, ss.blockHeight)
var wg sync.WaitGroup
wg.Add(ss.activePeerNumber)
for i := range ss.syncConfig.peers {
@ -203,15 +207,20 @@ func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) {
func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) {
// Creates sync config.
ss.CreateSyncConfig(peers)
// Makes connections to peers.
ss.makeConnectionToPeers()
for {
// Gets consensus hashes.
ss.getConsensusHashes()
// Gets consensus hashes.
ss.getConsensusHashes()
// Generates state-sync task queue.
ss.generateStateSyncTaskQueue()
// Generates state-sync task queue.
ss.generateStateSyncTaskQueue(bc)
ss.downloadBlocks(bc)
// Download blocks.
if ss.stateSyncTaskQueue.Len() > 0 {
ss.downloadBlocks(bc)
} else {
break
}
}
}

@ -22,20 +22,23 @@ const (
)
var (
PriIntOne = 111
PriIntTwo = 222
TestAddressOne = pki.GetAddressFromInt(PriIntOne)
TestAddressTwo = pki.GetAddressFromInt(PriIntTwo)
ShardID = uint32(0)
ServerPorts = []string{serverPort1, serverPort2, serverPort3}
PriIntOne = 111
PriIntTwo = 222
PriIntThree = 222
TestAddressOne = pki.GetAddressFromInt(PriIntOne)
TestAddressTwo = pki.GetAddressFromInt(PriIntTwo)
TestAddressThree = pki.GetAddressFromInt(PriIntThree)
ShardID = uint32(0)
ServerPorts = []string{serverPort1, serverPort2, serverPort3}
)
type FakeNode struct {
bc *bc.Blockchain
server *downloader.Server
ip string
port string
grpcServer *grpc.Server
bc *bc.Blockchain
server *downloader.Server
ip string
port string
grpcServer *grpc.Server
doneFirstTime bool
}
// GetBlockHashes used for state download.
@ -73,12 +76,21 @@ func (node *FakeNode) Start() error {
return err
}
func (node *FakeNode) addOneMoreBlock() {
addresses := [][20]byte{TestAddressThree}
node.bc.Blocks = append(node.bc.Blocks, bc.CreateMoreBlocks(addresses, ShardID)...)
}
func (node *FakeNode) CalculateResponse(request *pb.DownloaderRequest) (*pb.DownloaderResponse, error) {
response := &pb.DownloaderResponse{}
if request.Type == pb.DownloaderRequest_HEADER {
for _, block := range node.bc.Blocks {
response.Payload = append(response.Payload, block.Hash[:])
}
if !node.doneFirstTime {
node.addOneMoreBlock()
}
node.doneFirstTime = true
} else {
for i := range request.Hashes {
block := node.bc.FindBlock(request.Hashes[i])

Loading…
Cancel
Save