From 96576ef0c58cf1a6e509173418b3b4d8a14ec86d Mon Sep 17 00:00:00 2001 From: ak Date: Mon, 13 May 2019 15:59:47 -0700 Subject: [PATCH] working local txgen --- cmd/client/txgen/main.go | 218 ++++++++++++------------ internal/txgen/account_txs_generator.go | 36 ---- node/node.go | 4 +- node/node_handler.go | 4 +- node/node_syncing.go | 5 + node/puzzle_contract.go | 10 +- 6 files changed, 125 insertions(+), 152 deletions(-) delete mode 100644 internal/txgen/account_txs_generator.go diff --git a/cmd/client/txgen/main.go b/cmd/client/txgen/main.go index 055e204d8..371f169f9 100644 --- a/cmd/client/txgen/main.go +++ b/cmd/client/txgen/main.go @@ -3,27 +3,28 @@ package main import ( "flag" "fmt" + "math/big" + "math/rand" "os" "path" "runtime" "sync" "time" - bls2 "github.com/harmony-one/bls/ffi/go/bls" - + "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core" - "github.com/harmony-one/harmony/internal/utils/contract" - - "github.com/harmony-one/harmony/crypto/bls" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/params" + bls2 "github.com/harmony-one/bls/ffi/go/bls" "github.com/harmony-one/harmony/api/client" proto_node "github.com/harmony-one/harmony/api/proto/node" - "github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/crypto/bls" nodeconfig "github.com/harmony-one/harmony/internal/configs/node" - "github.com/harmony-one/harmony/internal/txgen" "github.com/harmony-one/harmony/internal/utils" + "github.com/harmony-one/harmony/internal/utils/contract" "github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/p2p" p2p_host "github.com/harmony-one/harmony/p2p/host" @@ -38,6 +39,16 @@ var ( stateMutex sync.Mutex ) +const ( + checkFrequency = 5 //checkfrequency checks whether the transaction generator is ready to send the next batch of transactions. +) + +// Settings is the settings for TX generation. No Cross-Shard Support! +type Settings struct { + NumOfAddress int + MaxNumTxsPerBatch int +} + func printVersion(me string) { fmt.Fprintf(os.Stderr, "Harmony (C) 2018. %v, version %v-%v (%v %v)\n", path.Base(me), version, commit, builtBy, builtAt) os.Exit(0) @@ -53,14 +64,12 @@ func main() { logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution") duration := flag.Int("duration", 10, "duration of the tx generation in second. If it's negative, the experiment runs forever.") versionFlag := flag.Bool("version", false, "Output version info") - crossShardRatio := flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.") - + crossShardRatio := flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.") //Keeping this for backward compatibility + shardIDFlag := flag.Int("shardID", 0, "The shardID the node belongs to.") // Key file to store the private key keyFile := flag.String("key", "./.txgenkey", "the private key file of the txgen") flag.Var(&utils.BootNodes, "bootnodes", "a list of bootnode multiaddress") - flag.Parse() - if *versionFlag { printVersion(os.Args[0]) } @@ -79,7 +88,6 @@ func main() { utils.BootNodes = bootNodeAddrs } - var shardIDs []uint32 nodePriKey, _, err := utils.LoadKeyFromFile(*keyFile) if err != nil { panic(err) @@ -89,25 +97,21 @@ func main() { if peerPubKey == nil { panic(fmt.Errorf("generate key error")) } - + shardID := *shardIDFlag + var shardIDs []uint32 + shardIDs = append(shardIDs, uint32(shardID)) selfPeer := p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: peerPubKey} // Init with LibP2P enabled, FIXME: (leochen) right now we support only one shard - for i := 0; i < core.GenesisShardNum; i++ { - shardIDs = append(shardIDs, uint32(i)) - } - - // Do cross shard tx if there are more than one shard - setting := txgen.Settings{ + setting := Settings{ NumOfAddress: 10000, - CrossShard: false, // len(shardIDs) > 1, MaxNumTxsPerBatch: *maxNumTxsPerBatch, - CrossShardRatio: *crossShardRatio, } + utils.GetLogInstance().Debug("Cross Shard Ratio Is Set But not used", "cx ratio", *crossShardRatio) // TODO(Richard): refactor this chuck to a single method // Setup a logger to stdout and log file. - logFileName := fmt.Sprintf("./%v/txgen.log", *logFolder) + logFileName := fmt.Sprintf("./%v/txgen-%v.log", *logFolder, shardID) h := log.MultiHandler( log.StreamHandler(os.Stdout, log.TerminalFormat(false)), log.Must.FileHandler(logFileName, log.LogfmtFormat()), // Log to file @@ -115,83 +119,81 @@ func main() { log.Root().SetHandler(h) gsif, err := consensus.NewGenesisStakeInfoFinder() - if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "Cannot initialize stake info: %v\n", err) - os.Exit(1) - } - // Nodes containing blockchain data to mirror the shards' data in the network - nodes := []*node.Node{} - host, err := p2pimpl.NewHost(&selfPeer, nodePriKey) + + myhost, err := p2pimpl.NewHost(&selfPeer, nodePriKey) if err != nil { panic("unable to new host in txgen") } - for _, shardID := range shardIDs { - c := &consensus.Consensus{ShardID: shardID} - node := node.New(host, c, nil, false) - c.SetStakeInfoFinder(gsif) - c.ChainReader = node.Blockchain() - // Replace public keys with genesis accounts for the shard - c.PublicKeys = nil - startIdx := core.GenesisShardSize * shardID - endIdx := startIdx + core.GenesisShardSize - for _, acct := range contract.GenesisBLSAccounts[startIdx:endIdx] { - secretKey := bls2.SecretKey{} - if err := secretKey.SetHexString(acct.Private); err != nil { - _, _ = fmt.Fprintf(os.Stderr, "cannot parse secret key: %v\n", - err) - os.Exit(1) - } - c.PublicKeys = append(c.PublicKeys, secretKey.GetPublicKey()) - } - // Assign many fake addresses so we have enough address to play with at first - nodes = append(nodes, node) - } - - // Client/txgenerator server node setup - consensusObj, err := consensus.New(host, 0, p2p.Peer{}, nil) if err != nil { fmt.Fprintf(os.Stderr, "Error :%v \n", err) os.Exit(1) } - - clientNode := node.New(host, consensusObj, nil, false) - clientNode.Client = client.NewClient(clientNode.GetHost(), shardIDs) - + consensusObj, err := consensus.New(myhost, uint32(shardID), p2p.Peer{}, nil) + txGen := node.New(myhost, consensusObj, nil, false) //Changed it : no longer archival node. + txGen.Client = client.NewClient(txGen.GetHost(), shardIDs) consensusObj.SetStakeInfoFinder(gsif) - consensusObj.ChainReader = clientNode.Blockchain() - + consensusObj.ChainReader = txGen.Blockchain() + consensusObj.PublicKeys = nil + startIdx := 0 + endIdx := startIdx + core.GenesisShardSize + for _, acct := range contract.GenesisBLSAccounts[startIdx:endIdx] { + secretKey := bls2.SecretKey{} + if err := secretKey.SetHexString(acct.Private); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "cannot parse secret key: %v\n", + err) + os.Exit(1) + } + consensusObj.PublicKeys = append(consensusObj.PublicKeys, secretKey.GetPublicKey()) + } + txGen.NodeConfig.SetRole(nodeconfig.ClientNode) + txGen.NodeConfig.SetIsBeacon(true) + txGen.NodeConfig.SetIsClient(true) + txGen.NodeConfig.SetShardGroupID(p2p.GroupIDBeacon) + txGen.ServiceManagerSetup() + txGen.RunServices() + time.Sleep(20 * time.Second) + start := time.Now() + totalTime := float64(*duration) + ticker := time.NewTicker(checkFrequency * time.Second) + txGen.GetSync() +syncLoop: + for { + t := time.Now() + if totalTime > 0 && t.Sub(start).Seconds() >= totalTime { + utils.GetLogInstance().Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) + break syncLoop + } + select { + case <-ticker.C: + if txGen.State.String() == "NodeReadyForConsensus" { + utils.GetLogInstance().Debug("Generator is now in Sync.", "txgen node", txGen.SelfPeer, "Node State", txGen.State.String()) + ticker.Stop() + break syncLoop + } + } + } readySignal := make(chan uint32) - // This func is used to update the client's blockchain when new blocks are received from the leaders updateBlocksFunc := func(blocks []*types.Block) { utils.GetLogInstance().Info("[Txgen] Received new block", "block num", blocks[0].NumberU64()) for _, block := range blocks { - for _, node := range nodes { - shardID := block.ShardID() - - if node.Consensus.ShardID == shardID { - // Add it to blockchain - utils.GetLogInstance().Info("Current Block", "block num", node.Blockchain().CurrentBlock().NumberU64()) - utils.GetLogInstance().Info("Adding block from leader", "txNum", len(block.Transactions()), "shardID", shardID, "preHash", block.ParentHash().Hex()) - node.AddNewBlock(block) - stateMutex.Lock() - node.Worker.UpdateCurrent() - stateMutex.Unlock() - readySignal <- shardID - } else { - continue - } + shardID := block.ShardID() + + if txGen.Consensus.ShardID == shardID { + // Add it to blockchain + utils.GetLogInstance().Info("Adding block from leader", "txNum", len(block.Transactions()), "shardID", shardID, "preHash", block.ParentHash().Hex(), "currentBlock", txGen.Blockchain().CurrentBlock().NumberU64(), "incoming block", block.NumberU64()) + txGen.AddNewBlock(block) + stateMutex.Lock() + txGen.Worker.UpdateCurrent() + stateMutex.Unlock() + readySignal <- shardID + } else { + continue } } } - clientNode.Client.UpdateBlocks = updateBlocksFunc - - clientNode.NodeConfig.SetRole(nodeconfig.ClientNode) - clientNode.NodeConfig.SetIsClient(true) - clientNode.ServiceManagerSetup() - clientNode.RunServices() - + txGen.Client.UpdateBlocks = updateBlocksFunc // Start the client server to listen to leader's message go func() { // wait for 3 seconds for client to send ping message to leader @@ -201,11 +203,6 @@ func main() { readySignal <- i } }() - - // Transaction generation process - start := time.Now() - totalTime := float64(*duration) - for { t := time.Now() if totalTime > 0 && t.Sub(start).Seconds() >= totalTime { @@ -214,25 +211,14 @@ func main() { } select { case shardID := <-readySignal: - shardIDTxsMap := make(map[uint32]types.Transactions) lock := sync.Mutex{} - - stateMutex.Lock() - utils.GetLogInstance().Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0)) - txs, _ := txgen.GenerateSimulatedTransactionsAccount(int(shardID), nodes, setting) - - lock.Lock() - // Put txs into corresponding shards - shardIDTxsMap[shardID] = append(shardIDTxsMap[shardID], txs...) - lock.Unlock() - stateMutex.Unlock() - - lock.Lock() - for shardID, txs := range shardIDTxsMap { // Send the txs to corresponding shards - go func(shardID uint32, txs types.Transactions) { - SendTxsToShard(clientNode, txs) - }(shardID, txs) + utils.GetLogInstance().Warn("STARTING TX GEN PUSH LOOP", "gomaxprocs", runtime.GOMAXPROCS(0)) + txs, err := GenerateSimulatedTransactionsAccount(uint32(shardID), txGen, setting) + if err != nil { + utils.GetLogInstance().Debug("Error in Generating Txns", "Err", err) } + lock.Lock() + SendTxsToShard(txGen, txs) lock.Unlock() case <-time.After(10 * time.Second): utils.GetLogInstance().Warn("No new block is received so far") @@ -241,14 +227,30 @@ func main() { // Send a stop message to stop the nodes at the end msg := proto_node.ConstructStopMessage() - clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg)) - clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, p2p_host.ConstructP2pMessage(byte(0), msg)) - + txGen.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg)) + txGen.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, p2p_host.ConstructP2pMessage(byte(0), msg)) time.Sleep(3 * time.Second) } // SendTxsToShard sends txs to shard, currently just to beacon shard func SendTxsToShard(clientNode *node.Node, txs types.Transactions) { msg := proto_node.ConstructTransactionListMessageAccount(txs) - clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg)) + err := clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg)) + if err != nil { + utils.GetLogInstance().Debug("Error in Sending Txns", "Err", err) + } +} + +// GenerateSimulatedTransactionsAccount generates simulated transaction for account model. +func GenerateSimulatedTransactionsAccount(shardID uint32, node *node.Node, setting Settings) (types.Transactions, error) { + _ = setting // TODO: make use of settings + txs := make([]*types.Transaction, 100) + for i := 0; i < 100; i++ { + baseNonce := node.Worker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(node.TestBankKeys[i].PublicKey)) + randomUserAddress := crypto.PubkeyToAddress(node.TestBankKeys[rand.Intn(100)].PublicKey) + randAmount := rand.Float32() + tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(0), randomUserAddress, shardID, big.NewInt(int64(params.Ether*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i]) + txs[i] = tx + } + return txs, nil } diff --git a/internal/txgen/account_txs_generator.go b/internal/txgen/account_txs_generator.go deleted file mode 100644 index 9c0406670..000000000 --- a/internal/txgen/account_txs_generator.go +++ /dev/null @@ -1,36 +0,0 @@ -package txgen - -import ( - "math/big" - "math/rand" - - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/params" - "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/node" -) - -// Settings is the settings for TX generation. -type Settings struct { - NumOfAddress int - CrossShard bool - MaxNumTxsPerBatch int - CrossShardRatio int -} - -// GenerateSimulatedTransactionsAccount generates simulated transaction for account model. -func GenerateSimulatedTransactionsAccount(shardID int, dataNodes []*node.Node, setting Settings) (types.Transactions, types.Transactions) { - _ = setting // TODO: take use of settings - node := dataNodes[shardID] - txs := make([]*types.Transaction, 100) - for i := 0; i < 100; i++ { - baseNonce := node.Worker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(node.TestBankKeys[i].PublicKey)) - for j := 0; j < 1; j++ { - randomUserAddress := crypto.PubkeyToAddress(node.TestBankKeys[rand.Intn(100)].PublicKey) - randAmount := rand.Float32() - tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(j), randomUserAddress, uint32(shardID), big.NewInt(int64(params.Ether*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i]) - txs[i*1+j] = tx - } - } - return txs, nil -} diff --git a/node/node.go b/node/node.go index f0fe3d234..f6b13e099 100644 --- a/node/node.go +++ b/node/node.go @@ -153,7 +153,7 @@ type Node struct { ContractAddresses []common.Address // For puzzle contracts - AddressNonce map[common.Address]*uint64 + AddressNonce sync.Map // Shard group Message Receiver shardGroupReceiver p2p.GroupReceiver @@ -325,8 +325,6 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database, is // Setup initial state of syncing. node.peerRegistrationRecord = make(map[string]*syncConfig) - node.AddressNonce = make(map[common.Address]*uint64) - node.startConsensus = make(chan struct{}) // Get the node config that's created in the harmony.go program. diff --git a/node/node_handler.go b/node/node_handler.go index f82f535e3..c17eea491 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -314,9 +314,9 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) { if err != nil { utils.GetLogInstance().Error("Error when parsing tx into message") } - if _, ok := node.AddressNonce[msg.From()]; ok { + if _, ok := node.AddressNonce.Load(msg.From()); ok { nonce := node.GetNonceOfAddress(msg.From()) - atomic.StoreUint64(node.AddressNonce[msg.From()], nonce) + node.AddressNonce.Store(msg.From(), nonce) } } diff --git a/node/node_syncing.go b/node/node_syncing.go index a6aa70f85..bcbf7edd6 100644 --- a/node/node_syncing.go +++ b/node/node_syncing.go @@ -40,6 +40,11 @@ func (node *Node) getNeighborPeers(neighbor *sync.Map) []p2p.Peer { return tmp } +// GetSync gets sync-ed to blockchain without joining consensus +func (node *Node) GetSync() { + go node.DoSyncing(node.blockchain, node.Worker, node.GetSyncingPeers, false) //Don't join consensus +} + // GetBeaconSyncingPeers returns a list of peers for beaconchain syncing func (node *Node) GetBeaconSyncingPeers() []p2p.Peer { return node.getNeighborPeers(&node.BeaconNeighbors) diff --git a/node/puzzle_contract.go b/node/puzzle_contract.go index bdf8041e5..e20735a11 100644 --- a/node/puzzle_contract.go +++ b/node/puzzle_contract.go @@ -214,11 +214,15 @@ func (node *Node) CreateTransactionForEndMethod(priKey string) (string, error) { // GetAndIncreaseAddressNonce get and increase the address's nonce func (node *Node) GetAndIncreaseAddressNonce(address common.Address) uint64 { - if value, ok := node.AddressNonce[address]; ok { - nonce := atomic.AddUint64(value, 1) + if value, ok := node.AddressNonce.Load(address); ok { + n, ok := value.(uint64) + if !ok { + return 0 + } + nonce := atomic.AddUint64(&n, 1) return nonce - 1 } nonce := node.GetNonceOfAddress(address) + 1 - node.AddressNonce[address] = &nonce + node.AddressNonce.Store(address, nonce) return nonce - 1 }