Merge pull request #917 from alajko/pTXGen

Shard-specific TXGen
pull/923/head
alajko 6 years ago committed by GitHub
commit 687b8ebdb6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      api/service/syncing/syncing.go
  2. 2
      cmd/client/txgen/README.md
  3. 93
      cmd/client/txgen/main.go
  4. 8
      node/node_syncing.go

@ -363,6 +363,7 @@ func (ss *StateSync) downloadBlocks(bc *core.BlockChain) {
var blockObj types.Block
// currently only send one block a time
err = rlp.DecodeBytes(payload[0], &blockObj)
if err != nil {
count++
utils.GetLogInstance().Debug("[SYNC] downloadBlocks: failed to DecodeBytes from received new block")
@ -607,6 +608,13 @@ func (ss *StateSync) getMaxPeerHeight() uint64 {
return maxHeight
}
// IsSameBlockchainHeight checks whether the node is out of sync from other peers
func (ss *StateSync) IsSameBlockchainHeight(bc *core.BlockChain) (uint64, bool) {
otherHeight := ss.getMaxPeerHeight()
currentHeight := bc.CurrentBlock().NumberU64()
return otherHeight, currentHeight == otherHeight
}
// IsOutOfSync checks whether the node is out of sync from other peers
func (ss *StateSync) IsOutOfSync(bc *core.BlockChain) bool {
otherHeight := ss.getMaxPeerHeight()

@ -1 +1,3 @@
TXGEN
The txgen program is used to simulate transactions and hit the Harmony network to loadtest its performance and robustness.
You can send txns to specific shards 1,2,3 or to shard 0. Sending it to shard 0, broadcasts txns to all the shards. (TODO: Investigate why?)

@ -60,14 +60,14 @@ func printVersion(me string) {
// processing.
var (
ip = flag.String("ip", "127.0.0.1", "IP of the node")
port = flag.String("port", "9999", "port of the node.")
maxNumTxsPerBatch = flag.Int("max_num_txs_per_batch", 20000, "number of transactions to send per message")
logFolder = flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
duration = flag.Int("duration", 30, "duration of the tx generation in second. If it's negative, the experiment runs forever.")
versionFlag = flag.Bool("version", false, "Output version info")
crossShardRatio = flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.") //Keeping this for backward compatibility
shardIDFlag = flag.Int("shardID", 0, "The shardID the node belongs to.")
ip = flag.String("ip", "127.0.0.1", "IP of the node")
port = flag.String("port", "9999", "port of the node.")
numTxns = flag.Int("numTxns", 100, "number of transactions to send per message")
logFolder = flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
duration = flag.Int("duration", 30, "duration of the tx generation in second. If it's negative, the experiment runs forever.")
versionFlag = flag.Bool("version", false, "Output version info")
crossShardRatio = flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.") //Keeping this for backward compatibility
shardIDFlag = flag.Int("shardID", 0, "The shardID the node belongs to.")
// Key file to store the private key
keyFile = flag.String("key", "./.txgenkey", "the private key file of the txgen")
// logging verbosity
@ -117,9 +117,15 @@ func setUpTXGen() *node.Node {
consensusObj.PublicKeys = append(consensusObj.PublicKeys, secretKey.GetPublicKey())
}
txGen.NodeConfig.SetRole(nodeconfig.ClientNode)
txGen.NodeConfig.SetIsBeacon(true)
if shardID == 0 {
txGen.NodeConfig.SetIsBeacon(true)
txGen.NodeConfig.SetShardGroupID(p2p.GroupIDBeacon)
} else {
txGen.NodeConfig.SetShardGroupID(p2p.NewGroupIDByShardID(p2p.ShardID(shardID)))
}
txGen.NodeConfig.SetIsClient(true)
txGen.NodeConfig.SetShardGroupID(p2p.GroupIDBeacon)
return txGen
}
func main() {
@ -143,8 +149,9 @@ func main() {
// Init with LibP2P enabled, FIXME: (leochen) right now we support only one shard
setting := Settings{
NumOfAddress: 10000,
MaxNumTxsPerBatch: *maxNumTxsPerBatch,
MaxNumTxsPerBatch: *numTxns,
}
shardID := *shardIDFlag
utils.GetLogInstance().Debug("Cross Shard Ratio Is Set But not used", "cx ratio", *crossShardRatio)
// TODO(Richard): refactor this chuck to a single method
@ -158,7 +165,6 @@ func main() {
txGen := setUpTXGen()
txGen.ServiceManagerSetup()
txGen.RunServices()
time.Sleep(20 * time.Second)
start := time.Now()
totalTime := float64(*duration)
utils.GetLogInstance().Debug("Total Duration", "totalTime", totalTime, "RunForever", isDurationForever(totalTime))
@ -187,12 +193,14 @@ syncLoop:
for _, block := range blocks {
shardID := block.ShardID()
if txGen.Consensus.ShardID == shardID {
utils.GetLogInstance().Info("Adding block from leader", "txNum", len(block.Transactions()), "shardID", shardID, "preHash", block.ParentHash().Hex(), "currentBlock", txGen.Blockchain().CurrentBlock().NumberU64(), "incoming block", block.NumberU64())
txGen.AddNewBlock(block)
stateMutex.Lock()
txGen.Worker.UpdateCurrent()
stateMutex.Unlock()
readySignal <- shardID
utils.GetLogInstance().Info("Got block from leader", "txNum", len(block.Transactions()), "shardID", shardID, "preHash", block.ParentHash().Hex(), "currentBlock", txGen.Blockchain().CurrentBlock().NumberU64(), "incoming block", block.NumberU64())
if block.NumberU64()-txGen.Blockchain().CurrentBlock().NumberU64() == 1 {
txGen.AddNewBlock(block)
stateMutex.Lock()
txGen.Worker.UpdateCurrent()
stateMutex.Unlock()
readySignal <- shardID
}
} else {
continue
}
@ -203,8 +211,8 @@ syncLoop:
go func() {
// wait for 3 seconds for client to send ping message to leader
// FIXME (leo) the readySignal should be set once we really sent ping message to leader
time.Sleep(3 * time.Second) // wait for nodes to be ready
readySignal <- uint32(0)
time.Sleep(1 * time.Second) // wait for nodes to be ready
readySignal <- uint32(shardID)
}()
pushLoop:
for {
@ -214,6 +222,17 @@ pushLoop:
utils.GetLogInstance().Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime)
break pushLoop
}
if shardID != 0 {
if otherHeight, flag := txGen.IsSameHeight(); flag {
if otherHeight >= 1 {
go func() {
readySignal <- uint32(shardID)
utils.GetLogInstance().Debug("Same blockchain height so readySignal generated")
time.Sleep(3 * time.Second) // wait for nodes to be ready
}()
}
}
}
select {
case shardID := <-readySignal:
lock := sync.Mutex{}
@ -223,7 +242,7 @@ pushLoop:
utils.GetLogInstance().Debug("Error in Generating Txns", "Err", err)
}
lock.Lock()
SendTxsToShard(txGen, txs)
SendTxsToShard(txGen, txs, uint32(shardID))
lock.Unlock()
case <-time.After(10 * time.Second):
utils.GetLogInstance().Warn("No new block is received so far")
@ -232,9 +251,15 @@ pushLoop:
}
// SendTxsToShard sends txs to shard, currently just to beacon shard
func SendTxsToShard(clientNode *node.Node, txs types.Transactions) {
func SendTxsToShard(clientNode *node.Node, txs types.Transactions, shardID uint32) {
msg := proto_node.ConstructTransactionListMessageAccount(txs)
err := clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg))
var err error
if shardID == 0 {
err = clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg))
} else {
clientGroup := p2p.NewClientGroupIDByShardID(p2p.ShardID(shardID))
err = clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg))
}
if err != nil {
utils.GetLogInstance().Debug("Error in Sending Txns", "Err", err)
}
@ -242,14 +267,24 @@ func SendTxsToShard(clientNode *node.Node, txs types.Transactions) {
// GenerateSimulatedTransactionsAccount generates simulated transaction for account model.
func GenerateSimulatedTransactionsAccount(shardID uint32, node *node.Node, setting Settings) (types.Transactions, error) {
_ = setting // TODO: make use of settings
txs := make([]*types.Transaction, 100)
TxnsToGenerate := setting.MaxNumTxsPerBatch // TODO: make use of settings
txs := make([]*types.Transaction, TxnsToGenerate)
rounds := (TxnsToGenerate / 100)
remainder := TxnsToGenerate % 100
for i := 0; i < 100; i++ {
baseNonce := node.Worker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(node.TestBankKeys[i].PublicKey))
randomUserAddress := crypto.PubkeyToAddress(node.TestBankKeys[rand.Intn(100)].PublicKey)
randAmount := rand.Float32()
tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(0), randomUserAddress, shardID, big.NewInt(int64(params.Ether*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i])
txs[i] = tx
for j := 0; j < rounds; j++ {
randomUserAddress := crypto.PubkeyToAddress(node.TestBankKeys[rand.Intn(100)].PublicKey)
randAmount := rand.Float32()
tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(j), randomUserAddress, shardID, big.NewInt(int64(params.Ether*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i])
txs[100*j+i] = tx
}
if i < remainder {
randomUserAddress := crypto.PubkeyToAddress(node.TestBankKeys[rand.Intn(100)].PublicKey)
randAmount := rand.Float32()
tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(rounds), randomUserAddress, shardID, big.NewInt(int64(params.Ether*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i])
txs[100*rounds+i] = tx
}
}
return txs, nil
}

@ -48,6 +48,14 @@ func (node *Node) DoSyncWithoutConsensus() {
go node.DoSyncing(node.Blockchain(), node.Worker, node.GetSyncingPeers, false) //Don't join consensus
}
// IsSameHeight tells whether node is at same bc height as a peer
func (node *Node) IsSameHeight() (uint64, bool) {
if node.stateSync == nil {
node.stateSync = syncing.CreateStateSync(node.SelfPeer.IP, node.SelfPeer.Port, node.GetSyncID())
}
return node.stateSync.IsSameBlockchainHeight(node.Blockchain())
}
// GetBeaconSyncingPeers returns a list of peers for beaconchain syncing
func (node *Node) GetBeaconSyncingPeers() []p2p.Peer {
return node.getNeighborPeers(&node.BeaconNeighbors)

Loading…
Cancel
Save