merge conflicts

pull/119/head
Richard Liu 6 years ago
commit 14ede10616
  1. 9
      benchmark.go
  2. 8
      client/txgen/main.go
  3. 2
      core/tx_pool_test.go
  4. 3
      local_config5.txt
  5. 81
      node/node.go
  6. 32
      node/node_handler.go
  7. 46
      node/node_test.go
  8. 3
      proto/node/node.go
  9. 50
      syncing/syncing.go
  10. 2
      syncing/syncing_test.go

@ -71,7 +71,6 @@ func loggingInit(logFolder, role, ip, port string, onlyLogTps bool) {
h = log.MatchFilterHandler("msg", "TPS Report", h)
}
log.Root().SetHandler(h)
}
func main() {
@ -87,7 +86,6 @@ func main() {
profile := flag.Bool("profile", false, "Turn on profiling (CPU, Memory).")
metricsReportURL := flag.String("metrics_report_url", "", "If set, reports metrics to this URL.")
versionFlag := flag.Bool("version", false, "Output version info")
syncNode := flag.Bool("sync_node", false, "Whether this node is a new node joining blockchain and it needs to get synced before joining consensus.")
onlyLogTps := flag.Bool("only_log_tps", false, "Only log TPS if true")
//This IP belongs to jenkins.harmony.one
@ -181,8 +179,8 @@ func main() {
attack.GetInstance().SetLogger(consensus.Log)
// Current node.
currentNode := node.New(consensus, ldb, selfPeer)
// Add sync node configuration.
currentNode.SyncNode = *syncNode
// Add self peer.
currentNode.SelfPeer = selfPeer
// If there is a client configured in the node list.
if clientPeer != nil {
currentNode.ClientPeer = clientPeer
@ -198,6 +196,7 @@ func main() {
currentNode.State = node.NodeWaitToJoin
if consensus.IsLeader {
currentNode.State = node.NodeLeader
if *accountModel {
// Let consensus run
go func() {
@ -220,6 +219,8 @@ func main() {
} else {
if *peerDiscovery {
go currentNode.JoinShard(leader)
} else {
currentNode.State = node.NodeDoingConsensus
}
}

@ -76,9 +76,9 @@ func main() {
)
log.Root().SetHandler(h)
clientPeer := config.GetClientPeer()
// Nodes containing utxopools to mirror the shards' data in the network
nodes := []*node.Node{}
clientPeer := config.GetClientPeer()
for shardID := range shardIDLeaderMap {
_, pubKey := utils.GenKey(clientPeer.IP, clientPeer.Port)
clientPeer.PubKey = pubKey
@ -90,7 +90,7 @@ func main() {
// Client/txgenerator server node setup
if clientPeer == nil {
log.Error("Client Peer is nil!")
panic("Client Peer is nil!")
}
consensusObj := consensus.New(*clientPeer, "0", nil, p2p.Peer{})
clientNode := node.New(consensusObj, nil, *clientPeer)
@ -136,9 +136,7 @@ func main() {
clientNode.Client.UpdateBlocks = updateBlocksFunc
// Start the client server to listen to leader's message
go func() {
clientNode.StartServer()
}()
go clientNode.StartServer()
// Transaction generation process
time.Sleep(10 * time.Second) // wait for nodes to be ready

@ -126,7 +126,7 @@ func validateEvents(events chan NewTxsEvent, count int) error {
case ev := <-events:
received = append(received, ev.Txs...)
case <-time.After(time.Second):
return fmt.Errorf("event #%d not fired", received)
return fmt.Errorf("event #%d not fired", len(received))
}
}
if len(received) > count {

@ -0,0 +1,3 @@
127.0.0.1 9000 leader 0
127.0.0.1 9001 validator 0
127.0.0.1 9999 client 0

@ -12,6 +12,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/crypto"
@ -29,6 +30,7 @@ import (
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/p2pv2"
proto_node "github.com/harmony-one/harmony/proto/node"
"github.com/harmony-one/harmony/syncing"
"github.com/harmony-one/harmony/syncing/downloader"
downloader_pb "github.com/harmony-one/harmony/syncing/downloader/proto"
)
@ -42,14 +44,19 @@ const (
NodeWaitToJoin // Node contacted BeaconChain, wait to join Shard
NodeJoinedShard // Node joined Shard, ready for consensus
NodeOffline // Node is offline
NodeReadyForConsensus // Node is ready to do consensus
NodeReadyForConsensus // Node is ready for doing consensus
NodeDoingConsensus // Node is already doing consensus
NodeLeader // Node is the leader of some shard.
)
// Constants related to doing syncing.
const (
// TimeToSleepForSyncing is the time waiting for node transformed into NodeDoingConsensus
TimeToSleepForSyncing = time.Second * 30
NotDoingSyncing uint32 = iota
DoingSyncing
)
const (
syncingPortDifference = 1000
waitBeforeJoinShard = time.Second * 3
timeOutToJoinShard = time.Minute * 10
)
@ -80,7 +87,6 @@ type Node struct {
SelfPeer p2p.Peer // TODO(minhdoan): it could be duplicated with Self below whose is Alok work.
IDCPeer p2p.Peer
SyncNode bool // TODO(minhdoan): Remove it later.
chain *core.BlockChain // Account Model
Neighbors sync.Map // All the neighbor nodes, key is the sha256 of Peer IP/Port, value is the p2p.Peer
State State // State of the Node
@ -95,6 +101,8 @@ type Node struct {
// Syncing component.
downloaderServer *downloader.Server
stateSync *syncing.StateSync
syncingState uint32
// Test only
TestBankKeys []*ecdsa.PrivateKey
@ -153,10 +161,6 @@ func (node *Node) getTransactionsForNewBlockAccount(maxNumTxs int) (types.Transa
// StartServer starts a server and process the request by a handler.
func (node *Node) StartServer() {
if node.SyncNode {
// Disable this temporarily.
// node.blockchain = syncing.StartBlockSyncing(node.Consensus.GetValidatorPeers())
}
if p2p.Version == 1 {
node.log.Debug("Starting server", "node", node, "ip", node.SelfPeer.IP, "port", node.SelfPeer.Port)
node.listenOnPort(node.SelfPeer.Port)
@ -322,9 +326,32 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase, selfPeer p2p.Peer) *Node
node.State = NodeInit
}
// Setup initial state of syncing.
node.syncingState = NotDoingSyncing
return &node
}
// DoSyncing starts syncing.
func (node *Node) DoSyncing() {
// If this node is currently doing sync, another call for syncing will be returned immediately.
if !atomic.CompareAndSwapUint32(&node.syncingState, NotDoingSyncing, DoingSyncing) {
return
}
defer atomic.StoreUint32(&node.syncingState, NotDoingSyncing)
if node.stateSync == nil {
node.stateSync = syncing.GetStateSync()
}
if node.stateSync.StartStateSync(node.GetSyncingPeers(), node.blockchain) {
node.log.Debug("DoSyncing: successfully sync")
if node.State == NodeJoinedShard {
node.State = NodeReadyForConsensus
}
} else {
node.log.Debug("DoSyncing: failed to sync")
}
}
// AddPeers adds neighbors nodes
func (node *Node) AddPeers(peers []*p2p.Peer) int {
count := 0
@ -347,6 +374,35 @@ func (node *Node) AddPeers(peers []*p2p.Peer) int {
return count
}
// GetSyncingPort returns the syncing port.
func GetSyncingPort(nodePort string) string {
if port, err := strconv.Atoi(nodePort); err == nil {
return fmt.Sprintf("%d", port-syncingPortDifference)
}
os.Exit(1)
return ""
}
// GetSyncingPeers returns list of peers.
// Right now, the list length is only 1 for testing.
// TODO(mihdoan): fix it later.
func (node *Node) GetSyncingPeers() []p2p.Peer {
res := []p2p.Peer{}
node.Neighbors.Range(func(k, v interface{}) bool {
node.log.Debug("GetSyncingPeers-Range: ", "k", k, "v", v)
if len(res) == 0 {
res = append(res, v.(p2p.Peer))
}
return true
})
for i := range res {
res[i].Port = GetSyncingPort(res[i].Port)
}
node.log.Debug("GetSyncingPeers: ", "res", res)
return res
}
// JoinShard helps a new node to join a shard.
func (node *Node) JoinShard(leader p2p.Peer) {
// try to join the shard, with 10 minutes time-out
@ -375,12 +431,9 @@ func (node *Node) InitSyncingServer() {
// StartSyncingServer starts syncing server.
func (node *Node) StartSyncingServer() {
if port, err := strconv.Atoi(node.SelfPeer.Port); err == nil {
node.downloaderServer.Start(node.SelfPeer.IP, fmt.Sprintf("%d", port-1000))
} else {
node.log.Error("Wrong port format provided")
os.Exit(1)
}
port := GetSyncingPort(node.SelfPeer.Port)
node.log.Info("support_sycning: StartSyncingServer on port:", "port", port)
node.downloaderServer.Start(node.SelfPeer.IP, GetSyncingPort(node.SelfPeer.Port))
}
// CalculateResponse implements DownloadInterface on Node object.

@ -90,6 +90,10 @@ func (node *Node) NodeHandler(conn net.Conn) {
}
}
case proto.Consensus:
if !(node.State == NodeDoingConsensus || node.State == NodeLeader || node.State == NodeReadyForConsensus) {
node.log.Info("This node with ", "peer", node.SelfPeer, "can not join consensus because they are either not noding consensus or not a leader", nil)
break
}
actionType := consensus.ConMessageType(msgType)
switch actionType {
case consensus.Consensus:
@ -99,6 +103,8 @@ func (node *Node) NodeHandler(conn net.Conn) {
} else {
node.log.Info("NET: received message: Consensus/Validator")
consensusObj.ProcessMessageValidator(msgPayload)
// TODO(minhdoan): add logic to check if the current blockchain is not sync with other consensus
// we should switch to other state rather than DoingConsensus.
}
}
case proto.Node:
@ -200,6 +206,11 @@ func (node *Node) NodeHandler(conn net.Conn) {
default:
node.log.Error("Unknown", "MsgCateory:", msgCategory)
}
// Post processing after receiving messsages.
if node.State == NodeJoinedShard || node.State == NodeReadyForConsensus {
go node.DoSyncing()
}
}
func (node *Node) transactionMessageHandler(msgPayload []byte) {
@ -389,18 +400,26 @@ func (node *Node) BroadcastNewBlock(newBlock *blockchain.Block) {
// VerifyNewBlock is called by consensus participants to verify the block they are running consensus on
func (node *Node) VerifyNewBlock(newBlock *blockchain.Block) bool {
// TODO: just a reminder for syncing. we need to check if the new block is fit with the current blockchain.
// The current blockchain can be in the progress of being synced.
var verified bool
if newBlock.AccountBlock != nil {
accountBlock := new(types.Block)
err := rlp.DecodeBytes(newBlock.AccountBlock, accountBlock)
if err != nil {
node.log.Error("Failed decoding the block with RLP")
}
return node.VerifyNewBlockAccount(accountBlock)
verified = node.VerifyNewBlockAccount(accountBlock)
} else if newBlock.IsStateBlock() {
verified = node.UtxoPool.VerifyStateBlock(newBlock)
} else {
verified = node.UtxoPool.VerifyTransactions(newBlock.Transactions)
}
if newBlock.IsStateBlock() {
return node.UtxoPool.VerifyStateBlock(newBlock)
if verified {
// Change the syncing state.
node.State = NodeDoingConsensus
}
return node.UtxoPool.VerifyTransactions(newBlock.Transactions)
return verified
}
// VerifyNewBlockAccount is called by consensus participants to verify the block (account model) they are running consensus on
@ -552,9 +571,6 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
node.log.Error("Can't get Pong Message")
return -1
}
// node.log.Info("Pong", "Msg", pong)
// TODO (lc) state syncing, and wait for all public keys
node.State = NodeJoinedShard
peers := make([]*p2p.Peer, 0)
@ -593,5 +609,7 @@ func (node *Node) pongMessageHandler(msgPayload []byte) int {
publicKeys = append(publicKeys, key)
}
node.State = NodeJoinedShard
return node.Consensus.UpdatePublicKeys(publicKeys)
}

@ -14,7 +14,7 @@ import (
"github.com/harmony-one/harmony/utils"
)
func TestNewNewNode(test *testing.T) {
func TestNewNewNode(t *testing.T) {
_, pubKey := utils.GenKey("1", "2")
leader := p2p.Peer{IP: "1", Port: "2", PubKey: pubKey}
validator := p2p.Peer{IP: "3", Port: "5"}
@ -22,27 +22,27 @@ func TestNewNewNode(test *testing.T) {
node := New(consensus, nil, leader)
if node.Consensus == nil {
test.Error("Consensus is not initialized for the node")
t.Error("Consensus is not initialized for the node")
}
if node.blockchain == nil {
test.Error("Blockchain is not initialized for the node")
t.Error("Blockchain is not initialized for the node")
}
if len(node.blockchain.Blocks) != 1 {
test.Error("Genesis block is not initialized for the node")
t.Error("Genesis block is not initialized for the node")
}
if len(node.blockchain.Blocks[0].Transactions) != 1 {
test.Error("Coinbase TX is not initialized for the node")
t.Error("Coinbase TX is not initialized for the node")
}
if node.UtxoPool == nil {
test.Error("Utxo pool is not initialized for the node")
t.Error("Utxo pool is not initialized for the node")
}
}
func TestCountNumTransactionsInBlockchain(test *testing.T) {
func TestCountNumTransactionsInBlockchain(t *testing.T) {
_, pubKey := utils.GenKey("1", "2")
leader := p2p.Peer{IP: "1", Port: "2", PubKey: pubKey}
validator := p2p.Peer{IP: "3", Port: "5"}
@ -51,11 +51,31 @@ func TestCountNumTransactionsInBlockchain(test *testing.T) {
node := New(consensus, nil, leader)
node.AddTestingAddresses(1000)
if node.countNumTransactionsInBlockchain() != 1001 {
test.Error("Count of transactions in the blockchain is incorrect")
t.Error("Count of transactions in the blockchain is incorrect")
}
}
func TestAddPeers(test *testing.T) {
func TestGetSyncingPeers(t *testing.T) {
_, pubKey := utils.GenKey("1", "2")
leader := p2p.Peer{IP: "1", Port: "2", PubKey: pubKey}
validator := p2p.Peer{IP: "3", Port: "5"}
consensus := consensus.New(leader, "0", []p2p.Peer{leader, validator}, leader)
node := New(consensus, nil, leader)
peer := p2p.Peer{IP: "1.1.1.1", Port: "2000"}
peer2 := p2p.Peer{IP: "2.1.1.1", Port: "2000"}
node.Neighbors.Store("minh", peer)
node.Neighbors.Store("mark", peer2)
res := node.GetSyncingPeers()
if len(res) != 1 || !(res[0].IP == peer.IP || res[0].IP == peer2.IP) {
t.Error("GetSyncingPeers should return list of {peer, peer2}")
}
if len(res) != 1 || res[0].Port != "1000" {
t.Error("Syncing ports should be 1000")
}
}
func TestAddPeers(t *testing.T) {
priKey1 := crypto.Ed25519Curve.Scalar().SetInt64(int64(333))
pubKey1 := pki.GetPublicKeyFromScalar(priKey1)
@ -87,12 +107,12 @@ func TestAddPeers(test *testing.T) {
r1 := node.AddPeers(peers1)
e1 := 2
if r1 != e1 {
test.Errorf("Add %v peers, expectd %v", r1, e1)
t.Errorf("Add %v peers, expectd %v", r1, e1)
}
r2 := node.AddPeers(peers1)
e2 := 0
if r2 != e2 {
test.Errorf("Add %v peers, expectd %v", r2, e2)
t.Errorf("Add %v peers, expectd %v", r2, e2)
}
}
@ -143,8 +163,8 @@ func sendPongMessage(leader p2p.Peer) {
}
func exitServer() {
fmt.Println("wait 15 seconds to terminate the process ...")
time.Sleep(15 * time.Second)
fmt.Println("wait 5 seconds to terminate the process ...")
time.Sleep(5 * time.Second)
os.Exit(0)
}

@ -3,7 +3,6 @@ package node
import (
"bytes"
"encoding/gob"
"fmt"
"log"
"github.com/ethereum/go-ethereum/rlp"
@ -161,7 +160,7 @@ func ConstructTransactionListMessageAccount(transactions types.Transactions) []b
txs, err := rlp.EncodeToBytes(transactions)
if err != nil {
fmt.Errorf("ERROR RLP %s", err)
log.Fatal(err)
return []byte{} // TODO(RJ): better handle of the error
}
byteBuffer.Write(txs)

@ -9,13 +9,16 @@ import (
"github.com/Workiva/go-datastructures/queue"
"github.com/harmony-one/harmony/blockchain"
"github.com/harmony-one/harmony/log"
"github.com/harmony-one/harmony/p2p"
"github.com/harmony-one/harmony/syncing/downloader"
)
// Constants for syncing.
const (
ConsensusRatio = float64(0.66)
ConsensusRatio = float64(0.66)
SleepTimeAfterNonConsensusBlockHashes = time.Second * 30
TimesToFail = 5
)
// SyncPeerConfig is peer config to sync.
@ -26,6 +29,8 @@ type SyncPeerConfig struct {
blockHashes [][]byte
}
var Log = log.New()
// SyncBlockTask is the task struct to sync a specific block.
type SyncBlockTask struct {
index int
@ -111,7 +116,10 @@ func (ss *StateSync) ProcessStateSyncFromPeers(peers []p2p.Peer, bc *blockchain.
// CreateSyncConfig creates SyncConfig for StateSync object.
func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) {
Log.Debug("CreateSyncConfig: len of peers", "len", len(peers))
Log.Debug("CreateSyncConfig: len of peers", "peers", peers)
ss.peerNumber = len(peers)
Log.Debug("CreateSyncConfig: hello")
ss.syncConfig = &SyncConfig{
peers: make([]*SyncPeerConfig, ss.peerNumber),
}
@ -120,7 +128,9 @@ func (ss *StateSync) CreateSyncConfig(peers []p2p.Peer) {
ip: peers[id].IP,
port: peers[id].Port,
}
Log.Debug("CreateSyncConfig: peer port to connect", "port", peers[id].Port)
}
Log.Info("syncing: Finished creating SyncConfig.")
}
// MakeConnectionToPeers makes grpc connection to all peers.
@ -136,6 +146,7 @@ func (ss *StateSync) MakeConnectionToPeers() {
}
wg.Wait()
ss.CleanUpNilPeers()
Log.Info("syncing: Finished making connection to peers.")
}
// CleanUpNilPeers cleans up peer with nil client and recalculate activePeerNumber.
@ -211,7 +222,8 @@ func (ss *StateSync) GetBlockHashesConsensusAndCleanUp() bool {
}
// GetConsensusHashes gets all hashes needed to download.
func (ss *StateSync) GetConsensusHashes() {
func (ss *StateSync) GetConsensusHashes() bool {
count := 0
for {
var wg sync.WaitGroup
wg.Add(ss.activePeerNumber)
@ -230,7 +242,15 @@ func (ss *StateSync) GetConsensusHashes() {
if ss.GetBlockHashesConsensusAndCleanUp() {
break
}
if count > TimesToFail {
Log.Info("GetConsensusHashes: reached # of times to failed")
return false
}
count++
time.Sleep(SleepTimeAfterNonConsensusBlockHashes)
}
Log.Info("syncing: Finished getting consensus block hashes.")
return true
}
// getConsensusHashes gets all hashes needed to download.
@ -249,6 +269,7 @@ func (ss *StateSync) generateStateSyncTaskQueue(bc *blockchain.Blockchain) {
break
}
}
Log.Info("syncing: Finished generateStateSyncTaskQueue.")
}
// downloadBlocks downloads blocks from state sync task queue.
@ -283,26 +304,23 @@ func (ss *StateSync) downloadBlocks(bc *blockchain.Blockchain) {
}(ss.syncConfig.peers[i], ss.stateSyncTaskQueue, bc)
}
wg.Wait()
Log.Info("syncing: Finished downloadBlocks.")
}
// StartStateSync starts state sync.
func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) {
func (ss *StateSync) StartStateSync(peers []p2p.Peer, bc *blockchain.Blockchain) bool {
// Creates sync config.
ss.CreateSyncConfig(peers)
// Makes connections to peers.
ss.MakeConnectionToPeers()
for {
// Gets consensus hashes.
ss.GetConsensusHashes()
// Generates state-sync task queue.
ss.generateStateSyncTaskQueue(bc)
// Download blocks.
if ss.stateSyncTaskQueue.Len() > 0 {
ss.downloadBlocks(bc)
} else {
break
}
// Gets consensus hashes.
if !ss.GetConsensusHashes() {
return false
}
ss.generateStateSyncTaskQueue(bc)
// Download blocks.
if ss.stateSyncTaskQueue.Len() > 0 {
ss.downloadBlocks(bc)
}
return true
}

@ -181,7 +181,7 @@ func TestSyncingIncludingBadNode(t *testing.T) {
peers[i].Port = fakeNodes[i].port
}
stateSync.StartStateSync(peers, bc)
assert.True(t, stateSync.StartStateSync(peers, bc), "should return true")
for i := range bc.Blocks {
if !reflect.DeepEqual(bc.Blocks[i], fakeNodes[0].bc.Blocks[i]) {

Loading…
Cancel
Save