working local txgen

pull/836/head
ak 6 years ago
parent 7fa822cb66
commit 96576ef0c5
  1. 218
      cmd/client/txgen/main.go
  2. 36
      internal/txgen/account_txs_generator.go
  3. 4
      node/node.go
  4. 4
      node/node_handler.go
  5. 5
      node/node_syncing.go
  6. 10
      node/puzzle_contract.go

@ -3,27 +3,28 @@ package main
import (
"flag"
"fmt"
"math/big"
"math/rand"
"os"
"path"
"runtime"
"sync"
"time"
bls2 "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/utils/contract"
"github.com/harmony-one/harmony/crypto/bls"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
bls2 "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/client"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/txgen"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/internal/utils/contract"
"github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p"
p2p_host "github.com/harmony-one/harmony/p2p/host"
@ -38,6 +39,16 @@ var (
stateMutex sync.Mutex
)
const (
checkFrequency = 5 //checkfrequency checks whether the transaction generator is ready to send the next batch of transactions.
)
// Settings is the settings for TX generation. No Cross-Shard Support!
type Settings struct {
NumOfAddress int
MaxNumTxsPerBatch int
}
func printVersion(me string) {
fmt.Fprintf(os.Stderr, "Harmony (C) 2018. %v, version %v-%v (%v %v)\n", path.Base(me), version, commit, builtBy, builtAt)
os.Exit(0)
@ -53,14 +64,12 @@ func main() {
logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
duration := flag.Int("duration", 10, "duration of the tx generation in second. If it's negative, the experiment runs forever.")
versionFlag := flag.Bool("version", false, "Output version info")
crossShardRatio := flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.")
crossShardRatio := flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.") //Keeping this for backward compatibility
shardIDFlag := flag.Int("shardID", 0, "The shardID the node belongs to.")
// Key file to store the private key
keyFile := flag.String("key", "./.txgenkey", "the private key file of the txgen")
flag.Var(&utils.BootNodes, "bootnodes", "a list of bootnode multiaddress")
flag.Parse()
if *versionFlag {
printVersion(os.Args[0])
}
@ -79,7 +88,6 @@ func main() {
utils.BootNodes = bootNodeAddrs
}
var shardIDs []uint32
nodePriKey, _, err := utils.LoadKeyFromFile(*keyFile)
if err != nil {
panic(err)
@ -89,25 +97,21 @@ func main() {
if peerPubKey == nil {
panic(fmt.Errorf("generate key error"))
}
shardID := *shardIDFlag
var shardIDs []uint32
shardIDs = append(shardIDs, uint32(shardID))
selfPeer := p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: peerPubKey}
// Init with LibP2P enabled, FIXME: (leochen) right now we support only one shard
for i := 0; i < core.GenesisShardNum; i++ {
shardIDs = append(shardIDs, uint32(i))
}
// Do cross shard tx if there are more than one shard
setting := txgen.Settings{
setting := Settings{
NumOfAddress: 10000,
CrossShard: false, // len(shardIDs) > 1,
MaxNumTxsPerBatch: *maxNumTxsPerBatch,
CrossShardRatio: *crossShardRatio,
}
utils.GetLogInstance().Debug("Cross Shard Ratio Is Set But not used", "cx ratio", *crossShardRatio)
// TODO(Richard): refactor this chuck to a single method
// Setup a logger to stdout and log file.
logFileName := fmt.Sprintf("./%v/txgen.log", *logFolder)
logFileName := fmt.Sprintf("./%v/txgen-%v.log", *logFolder, shardID)
h := log.MultiHandler(
log.StreamHandler(os.Stdout, log.TerminalFormat(false)),
log.Must.FileHandler(logFileName, log.LogfmtFormat()), // Log to file
@ -115,83 +119,81 @@ func main() {
log.Root().SetHandler(h)
gsif, err := consensus.NewGenesisStakeInfoFinder()
if err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Cannot initialize stake info: %v\n", err)
os.Exit(1)
}
// Nodes containing blockchain data to mirror the shards' data in the network
nodes := []*node.Node{}
host, err := p2pimpl.NewHost(&selfPeer, nodePriKey)
myhost, err := p2pimpl.NewHost(&selfPeer, nodePriKey)
if err != nil {
panic("unable to new host in txgen")
}
for _, shardID := range shardIDs {
c := &consensus.Consensus{ShardID: shardID}
node := node.New(host, c, nil, false)
c.SetStakeInfoFinder(gsif)
c.ChainReader = node.Blockchain()
// Replace public keys with genesis accounts for the shard
c.PublicKeys = nil
startIdx := core.GenesisShardSize * shardID
endIdx := startIdx + core.GenesisShardSize
for _, acct := range contract.GenesisBLSAccounts[startIdx:endIdx] {
secretKey := bls2.SecretKey{}
if err := secretKey.SetHexString(acct.Private); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "cannot parse secret key: %v\n",
err)
os.Exit(1)
}
c.PublicKeys = append(c.PublicKeys, secretKey.GetPublicKey())
}
// Assign many fake addresses so we have enough address to play with at first
nodes = append(nodes, node)
}
// Client/txgenerator server node setup
consensusObj, err := consensus.New(host, 0, p2p.Peer{}, nil)
if err != nil {
fmt.Fprintf(os.Stderr, "Error :%v \n", err)
os.Exit(1)
}
clientNode := node.New(host, consensusObj, nil, false)
clientNode.Client = client.NewClient(clientNode.GetHost(), shardIDs)
consensusObj, err := consensus.New(myhost, uint32(shardID), p2p.Peer{}, nil)
txGen := node.New(myhost, consensusObj, nil, false) //Changed it : no longer archival node.
txGen.Client = client.NewClient(txGen.GetHost(), shardIDs)
consensusObj.SetStakeInfoFinder(gsif)
consensusObj.ChainReader = clientNode.Blockchain()
consensusObj.ChainReader = txGen.Blockchain()
consensusObj.PublicKeys = nil
startIdx := 0
endIdx := startIdx + core.GenesisShardSize
for _, acct := range contract.GenesisBLSAccounts[startIdx:endIdx] {
secretKey := bls2.SecretKey{}
if err := secretKey.SetHexString(acct.Private); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "cannot parse secret key: %v\n",
err)
os.Exit(1)
}
consensusObj.PublicKeys = append(consensusObj.PublicKeys, secretKey.GetPublicKey())
}
txGen.NodeConfig.SetRole(nodeconfig.ClientNode)
txGen.NodeConfig.SetIsBeacon(true)
txGen.NodeConfig.SetIsClient(true)
txGen.NodeConfig.SetShardGroupID(p2p.GroupIDBeacon)
txGen.ServiceManagerSetup()
txGen.RunServices()
time.Sleep(20 * time.Second)
start := time.Now()
totalTime := float64(*duration)
ticker := time.NewTicker(checkFrequency * time.Second)
txGen.GetSync()
syncLoop:
for {
t := time.Now()
if totalTime > 0 && t.Sub(start).Seconds() >= totalTime {
utils.GetLogInstance().Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime)
break syncLoop
}
select {
case <-ticker.C:
if txGen.State.String() == "NodeReadyForConsensus" {
utils.GetLogInstance().Debug("Generator is now in Sync.", "txgen node", txGen.SelfPeer, "Node State", txGen.State.String())
ticker.Stop()
break syncLoop
}
}
}
readySignal := make(chan uint32)
// This func is used to update the client's blockchain when new blocks are received from the leaders
updateBlocksFunc := func(blocks []*types.Block) {
utils.GetLogInstance().Info("[Txgen] Received new block", "block num", blocks[0].NumberU64())
for _, block := range blocks {
for _, node := range nodes {
shardID := block.ShardID()
if node.Consensus.ShardID == shardID {
// Add it to blockchain
utils.GetLogInstance().Info("Current Block", "block num", node.Blockchain().CurrentBlock().NumberU64())
utils.GetLogInstance().Info("Adding block from leader", "txNum", len(block.Transactions()), "shardID", shardID, "preHash", block.ParentHash().Hex())
node.AddNewBlock(block)
stateMutex.Lock()
node.Worker.UpdateCurrent()
stateMutex.Unlock()
readySignal <- shardID
} else {
continue
}
shardID := block.ShardID()
if txGen.Consensus.ShardID == shardID {
// Add it to blockchain
utils.GetLogInstance().Info("Adding block from leader", "txNum", len(block.Transactions()), "shardID", shardID, "preHash", block.ParentHash().Hex(), "currentBlock", txGen.Blockchain().CurrentBlock().NumberU64(), "incoming block", block.NumberU64())
txGen.AddNewBlock(block)
stateMutex.Lock()
txGen.Worker.UpdateCurrent()
stateMutex.Unlock()
readySignal <- shardID
} else {
continue
}
}
}
clientNode.Client.UpdateBlocks = updateBlocksFunc
clientNode.NodeConfig.SetRole(nodeconfig.ClientNode)
clientNode.NodeConfig.SetIsClient(true)
clientNode.ServiceManagerSetup()
clientNode.RunServices()
txGen.Client.UpdateBlocks = updateBlocksFunc
// Start the client server to listen to leader's message
go func() {
// wait for 3 seconds for client to send ping message to leader
@ -201,11 +203,6 @@ func main() {
readySignal <- i
}
}()
// Transaction generation process
start := time.Now()
totalTime := float64(*duration)
for {
t := time.Now()
if totalTime > 0 && t.Sub(start).Seconds() >= totalTime {
@ -214,25 +211,14 @@ func main() {
}
select {
case shardID := <-readySignal:
shardIDTxsMap := make(map[uint32]types.Transactions)
lock := sync.Mutex{}
stateMutex.Lock()
utils.GetLogInstance().Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0))
txs, _ := txgen.GenerateSimulatedTransactionsAccount(int(shardID), nodes, setting)
lock.Lock()
// Put txs into corresponding shards
shardIDTxsMap[shardID] = append(shardIDTxsMap[shardID], txs...)
lock.Unlock()
stateMutex.Unlock()
lock.Lock()
for shardID, txs := range shardIDTxsMap { // Send the txs to corresponding shards
go func(shardID uint32, txs types.Transactions) {
SendTxsToShard(clientNode, txs)
}(shardID, txs)
utils.GetLogInstance().Warn("STARTING TX GEN PUSH LOOP", "gomaxprocs", runtime.GOMAXPROCS(0))
txs, err := GenerateSimulatedTransactionsAccount(uint32(shardID), txGen, setting)
if err != nil {
utils.GetLogInstance().Debug("Error in Generating Txns", "Err", err)
}
lock.Lock()
SendTxsToShard(txGen, txs)
lock.Unlock()
case <-time.After(10 * time.Second):
utils.GetLogInstance().Warn("No new block is received so far")
@ -241,14 +227,30 @@ func main() {
// Send a stop message to stop the nodes at the end
msg := proto_node.ConstructStopMessage()
clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg))
clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, p2p_host.ConstructP2pMessage(byte(0), msg))
txGen.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg))
txGen.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeacon}, p2p_host.ConstructP2pMessage(byte(0), msg))
time.Sleep(3 * time.Second)
}
// SendTxsToShard sends txs to shard, currently just to beacon shard
func SendTxsToShard(clientNode *node.Node, txs types.Transactions) {
msg := proto_node.ConstructTransactionListMessageAccount(txs)
clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg))
err := clientNode.GetHost().SendMessageToGroups([]p2p.GroupID{p2p.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg))
if err != nil {
utils.GetLogInstance().Debug("Error in Sending Txns", "Err", err)
}
}
// GenerateSimulatedTransactionsAccount generates simulated transaction for account model.
func GenerateSimulatedTransactionsAccount(shardID uint32, node *node.Node, setting Settings) (types.Transactions, error) {
_ = setting // TODO: make use of settings
txs := make([]*types.Transaction, 100)
for i := 0; i < 100; i++ {
baseNonce := node.Worker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(node.TestBankKeys[i].PublicKey))
randomUserAddress := crypto.PubkeyToAddress(node.TestBankKeys[rand.Intn(100)].PublicKey)
randAmount := rand.Float32()
tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(0), randomUserAddress, shardID, big.NewInt(int64(params.Ether*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i])
txs[i] = tx
}
return txs, nil
}

@ -1,36 +0,0 @@
package txgen
import (
"math/big"
"math/rand"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/node"
)
// Settings is the settings for TX generation.
type Settings struct {
NumOfAddress int
CrossShard bool
MaxNumTxsPerBatch int
CrossShardRatio int
}
// GenerateSimulatedTransactionsAccount generates simulated transaction for account model.
func GenerateSimulatedTransactionsAccount(shardID int, dataNodes []*node.Node, setting Settings) (types.Transactions, types.Transactions) {
_ = setting // TODO: take use of settings
node := dataNodes[shardID]
txs := make([]*types.Transaction, 100)
for i := 0; i < 100; i++ {
baseNonce := node.Worker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(node.TestBankKeys[i].PublicKey))
for j := 0; j < 1; j++ {
randomUserAddress := crypto.PubkeyToAddress(node.TestBankKeys[rand.Intn(100)].PublicKey)
randAmount := rand.Float32()
tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(j), randomUserAddress, uint32(shardID), big.NewInt(int64(params.Ether*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i])
txs[i*1+j] = tx
}
}
return txs, nil
}

@ -153,7 +153,7 @@ type Node struct {
ContractAddresses []common.Address
// For puzzle contracts
AddressNonce map[common.Address]*uint64
AddressNonce sync.Map
// Shard group Message Receiver
shardGroupReceiver p2p.GroupReceiver
@ -325,8 +325,6 @@ func New(host p2p.Host, consensusObj *consensus.Consensus, db ethdb.Database, is
// Setup initial state of syncing.
node.peerRegistrationRecord = make(map[string]*syncConfig)
node.AddressNonce = make(map[common.Address]*uint64)
node.startConsensus = make(chan struct{})
// Get the node config that's created in the harmony.go program.

@ -314,9 +314,9 @@ func (node *Node) PostConsensusProcessing(newBlock *types.Block) {
if err != nil {
utils.GetLogInstance().Error("Error when parsing tx into message")
}
if _, ok := node.AddressNonce[msg.From()]; ok {
if _, ok := node.AddressNonce.Load(msg.From()); ok {
nonce := node.GetNonceOfAddress(msg.From())
atomic.StoreUint64(node.AddressNonce[msg.From()], nonce)
node.AddressNonce.Store(msg.From(), nonce)
}
}

@ -40,6 +40,11 @@ func (node *Node) getNeighborPeers(neighbor *sync.Map) []p2p.Peer {
return tmp
}
// GetSync gets sync-ed to blockchain without joining consensus
func (node *Node) GetSync() {
go node.DoSyncing(node.blockchain, node.Worker, node.GetSyncingPeers, false) //Don't join consensus
}
// GetBeaconSyncingPeers returns a list of peers for beaconchain syncing
func (node *Node) GetBeaconSyncingPeers() []p2p.Peer {
return node.getNeighborPeers(&node.BeaconNeighbors)

@ -214,11 +214,15 @@ func (node *Node) CreateTransactionForEndMethod(priKey string) (string, error) {
// GetAndIncreaseAddressNonce get and increase the address's nonce
func (node *Node) GetAndIncreaseAddressNonce(address common.Address) uint64 {
if value, ok := node.AddressNonce[address]; ok {
nonce := atomic.AddUint64(value, 1)
if value, ok := node.AddressNonce.Load(address); ok {
n, ok := value.(uint64)
if !ok {
return 0
}
nonce := atomic.AddUint64(&n, 1)
return nonce - 1
}
nonce := node.GetNonceOfAddress(address) + 1
node.AddressNonce[address] = &nonce
node.AddressNonce.Store(address, nonce)
return nonce - 1
}

Loading…
Cancel
Save