add dosyncing for node and fix test of getpeer

pull/125/head
Minh Doan 6 years ago
parent b6dd823812
commit 461db024bc
  1. 3
      benchmark.go
  2. 28
      node/node.go
  3. 2
      node/node_test.go

@ -85,7 +85,6 @@ func main() {
profile := flag.Bool("profile", false, "Turn on profiling (CPU, Memory).") profile := flag.Bool("profile", false, "Turn on profiling (CPU, Memory).")
metricsReportURL := flag.String("metrics_report_url", "", "If set, reports metrics to this URL.") metricsReportURL := flag.String("metrics_report_url", "", "If set, reports metrics to this URL.")
versionFlag := flag.Bool("version", false, "Output version info") versionFlag := flag.Bool("version", false, "Output version info")
syncNode := flag.Bool("sync_node", false, "Whether this node is a new node joining blockchain and it needs to get synced before joining consensus.")
onlyLogTps := flag.Bool("only_log_tps", false, "Only log TPS if true") onlyLogTps := flag.Bool("only_log_tps", false, "Only log TPS if true")
// This IP belongs to jenkins.harmony.one // This IP belongs to jenkins.harmony.one
@ -186,8 +185,6 @@ func main() {
currentNode := node.New(consensus, ldb, selfPeer) currentNode := node.New(consensus, ldb, selfPeer)
// Add self peer. // Add self peer.
currentNode.SelfPeer = selfPeer currentNode.SelfPeer = selfPeer
// Add sync node configuration.
currentNode.SyncNode = *syncNode
// If there is a client configured in the node list. // If there is a client configured in the node list.
if clientPeer != nil { if clientPeer != nil {
currentNode.ClientPeer = clientPeer currentNode.ClientPeer = clientPeer

@ -12,6 +12,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto"
@ -29,6 +30,7 @@ import (
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2pv2" "github.com/harmony-one/harmony/p2pv2"
proto_node "github.com/harmony-one/harmony/proto/node" proto_node "github.com/harmony-one/harmony/proto/node"
"github.com/harmony-one/harmony/syncing"
"github.com/harmony-one/harmony/syncing/downloader" "github.com/harmony-one/harmony/syncing/downloader"
downloader_pb "github.com/harmony-one/harmony/syncing/downloader/proto" downloader_pb "github.com/harmony-one/harmony/syncing/downloader/proto"
) )
@ -46,6 +48,12 @@ const (
NodeLeader // Node is the leader of some shard. NodeLeader // Node is the leader of some shard.
) )
// Constants related to doing syncing.
const (
NotDoingSyncing uint32 = iota
DoingSyncing
)
const ( const (
// TimeToSleepForSyncing is the time waiting for node transformed into NodeDoingConsensus // TimeToSleepForSyncing is the time waiting for node transformed into NodeDoingConsensus
TimeToSleepForSyncing = time.Second * 30 TimeToSleepForSyncing = time.Second * 30
@ -77,7 +85,6 @@ type Node struct {
SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work. SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work.
IDCPeer p2p.Peer IDCPeer p2p.Peer
SyncNode bool // TODO(minhdoan): Remove it later.
chain *core.BlockChain // Account Model chain *core.BlockChain // Account Model
Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer
State State // State of the Node State State // State of the Node
@ -92,6 +99,8 @@ type Node struct {
// Syncing component. // Syncing component.
downloaderServer *downloader.Server downloaderServer *downloader.Server
stateSync *syncing.StateSync
syncingState uint32
// Test only // Test only
TestBankKeys []*ecdsa.PrivateKey TestBankKeys []*ecdsa.PrivateKey
@ -150,10 +159,6 @@ func (node *Node) getTransactionsForNewBlockAccount(maxNumTxs int) (types.Transa
// StartServer starts a server and process the request by a handler. // StartServer starts a server and process the request by a handler.
func (node *Node) StartServer(port string) { func (node *Node) StartServer(port string) {
if node.SyncNode {
// Disable this temporarily.
// node.blockchain = syncing.StartBlockSyncing(node.Consensus.GetValidatorPeers())
}
if p2p.Version == 1 { if p2p.Version == 1 {
fmt.Println("going to start server on port:", port) fmt.Println("going to start server on port:", port)
//node.log.Debug("Starting server", "node", node, "port", port) //node.log.Debug("Starting server", "node", node, "port", port)
@ -320,12 +325,23 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase, selfPeer p2p.Peer) *Node
node.State = NodeInit node.State = NodeInit
} }
// Setup initial state of syncing.
node.syncingState = NotDoingSyncing
return &node return &node
} }
// DoSyncing starts syncing. // DoSyncing starts syncing.
func (node *Node) DoSyncing() { func (node *Node) DoSyncing() {
// If this node is currently doing sync, another call for syncing will be returned immediately.
if !atomic.CompareAndSwapUint32(&node.syncingState, NotDoingSyncing, DoingSyncing) {
return
}
defer atomic.StoreUint32(&node.syncingState, NotDoingSyncing)
if node.stateSync != nil {
node.stateSync = syncing.GetStateSync()
}
node.stateSync.StartStateSync(node.GetPeers(), node.blockchain)
} }
// AddPeers adds neighbors nodes // AddPeers adds neighbors nodes

@ -65,7 +65,7 @@ func TestGetPeers(t *testing.T) {
node.Neighbors.Store("minh", peer) node.Neighbors.Store("minh", peer)
node.Neighbors.Store("mark", peer2) node.Neighbors.Store("mark", peer2)
res := node.GetPeers() res := node.GetPeers()
if len(res) != 2 || res[0] != peer || res[1] != peer2 { if len(res) != 2 || !((res[0] == peer && res[1] == peer2) || (res[1] == peer && res[0] == peer2)) {
t.Error("GetPeers should return list of {peer, peer2}") t.Error("GetPeers should return list of {peer, peer2}")
} }
} }

Loading…
Cancel
Save