[project] Remove txgen, prefer stack based init & map literals (#2678)

pull/2681/head
Edgar Aroutiounian 5 years ago committed by GitHub
parent dfceae3691
commit 7da8ec0a02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      README.md
  2. 21
      api/proto/discovery/pingpong.go
  3. 9
      api/service/config.go
  4. 3
      cmd/client/txgen/README.md
  5. 328
      cmd/client/txgen/main.go
  6. 10
      node/node.go
  7. 5
      scripts/go_executable_build.sh
  8. 2
      test/deploy.sh
  9. 12
      test/deploy_newnode.sh
  10. 2
      test/kill_node.sh

@ -178,18 +178,6 @@ Harmony server / main node:
```
Wallet:
```bash
./scripts/go_executable_build.sh wallet
```
Tx Generator:
```bash
./scripts/go_executable_build.sh txgen
```
### Harmony docs and guides
https://docs.harmony.one

@ -33,8 +33,7 @@ func (p PingMessageType) String() string {
// NewPingMessage creates a new Ping message based on the p2p.Peer input
func NewPingMessage(peer p2p.Peer, isClient bool) *PingMessageType {
ping := new(PingMessageType)
ping := PingMessageType{}
ping.Version = proto.ProtocolVersion
ping.NodeVer = nodeconfig.GetVersion()
ping.Node.IP = peer.IP
@ -48,33 +47,27 @@ func NewPingMessage(peer p2p.Peer, isClient bool) *PingMessageType {
ping.Node.Role = node.ClientRole
}
return ping
return &ping
}
// GetPingMessage deserializes the Ping Message from a list of byte
func GetPingMessage(payload []byte) (*PingMessageType, error) {
ping := new(PingMessageType)
ping := PingMessageType{}
r := bytes.NewBuffer(payload)
decoder := gob.NewDecoder(r)
err := decoder.Decode(ping)
if err != nil {
if err := decoder.Decode(&ping); err != nil {
utils.Logger().Error().Err(err).Msg("[GetPingMessage] Decode")
return nil, fmt.Errorf("Decode Ping Error")
}
return ping, nil
return &ping, nil
}
// ConstructPingMessage contructs ping message from node to leader
func (p PingMessageType) ConstructPingMessage() []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(node.PING))
byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node), byte(node.PING)})
encoder := gob.NewEncoder(byteBuffer)
err := encoder.Encode(p)
if err != nil {
if err := encoder.Encode(p); err != nil {
utils.Logger().Error().Err(err).Msg("[ConstructPingMessage] Encode")
return nil
}

@ -13,7 +13,7 @@ type NodeConfig struct {
Beacon nodeconfig.GroupID // the beacon group ID
ShardGroupID nodeconfig.GroupID // the group ID of the shard
Client nodeconfig.GroupID // the client group ID of the shard
IsClient bool // whether this node is a client node, such as wallet/txgen
IsClient bool // whether this node is a client node
IsBeacon bool // whether this node is a beacon node or not
ShardID uint32 // shardID of this node
Actions map[nodeconfig.GroupID]nodeconfig.ActionType // actions on the groups
@ -26,14 +26,11 @@ type NodeConfig struct {
// key is the shard ID
// value is the corresponding group ID
var (
GroupIDShards map[nodeconfig.ShardID]nodeconfig.GroupID
GroupIDShardClients map[nodeconfig.ShardID]nodeconfig.GroupID
GroupIDShards = map[nodeconfig.ShardID]nodeconfig.GroupID{}
GroupIDShardClients = map[nodeconfig.ShardID]nodeconfig.GroupID{}
)
func init() {
GroupIDShards = make(map[nodeconfig.ShardID]nodeconfig.GroupID)
GroupIDShardClients = make(map[nodeconfig.ShardID]nodeconfig.GroupID)
// init beacon chain group IDs
GroupIDShards[0] = nodeconfig.NewGroupIDByShardID(0)
GroupIDShardClients[0] = nodeconfig.NewClientGroupIDByShardID(0)

@ -1,3 +0,0 @@
TXGEN
The txgen program is used to simulate transactions and hit the Harmony network to loadtest its performance and robustness.
You can send txns to specific shards 1,2,3 or to shard 0. Sending it to shard 0, broadcasts txns to all the shards. (TODO: Investigate why?)

@ -1,328 +0,0 @@
package main
import (
"flag"
"fmt"
"math/big"
"math/rand"
"os"
"path"
"sync"
"time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
bls2 "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/client"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/common/denominations"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/consensus/quorum"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/genesis"
"github.com/harmony-one/harmony/internal/params"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p"
p2p_host "github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/p2pimpl"
p2putils "github.com/harmony-one/harmony/p2p/utils"
"github.com/harmony-one/harmony/shard"
)
var (
version string
builtBy string
builtAt string
commit string
stateMutex sync.Mutex
)
const (
checkFrequency = 2 //checkfrequency checks whether the transaction generator is ready to send the next batch of transactions.
)
// Settings is the settings for TX generation. No Cross-Shard Support!
type Settings struct {
NumOfAddress int
MaxNumTxsPerBatch int
}
func printVersion(me string) {
fmt.Fprintf(os.Stderr, "Harmony (C) 2019. %v, version %v-%v (%v %v)\n", path.Base(me), version, commit, builtBy, builtAt)
os.Exit(0)
}
// The main entrance for the transaction generator program which simulate transactions and send to the network for
// processing.
var (
ip = flag.String("ip", "127.0.0.1", "IP of the node")
port = flag.String("port", "9999", "port of the node.")
numTxns = flag.Int("numTxns", 100, "number of transactions to send per message")
logFolder = flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
duration = flag.Int("duration", 30, "duration of the tx generation in second. If it's negative, the experiment runs forever.")
versionFlag = flag.Bool("version", false, "Output version info")
crossShardRatio = flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.") //Keeping this for backward compatibility
shardIDFlag = flag.Int("shardID", 0, "The shardID the node belongs to.")
// Key file to store the private key
keyFile = flag.String("key", "./.txgenkey", "the private key file of the txgen")
// logging verbosity
verbosity = flag.Int("verbosity", 5, "Logging verbosity: 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=detail (default: 5)")
)
func setUpTXGen() *node.Node {
nodePriKey, _, err := utils.LoadKeyFromFile(*keyFile)
if err != nil {
utils.FatalErrMsg(err, "cannot load key from %s", *keyFile)
}
peerPubKey := bls.RandPrivateKey().GetPublicKey()
if peerPubKey == nil {
utils.FatalErrMsg(err, "cannot generate BLS key")
}
shardID := *shardIDFlag
selfPeer := p2p.Peer{IP: *ip, Port: *port, ConsensusPubKey: peerPubKey}
// Nodes containing blockchain data to mirror the shards' data in the network
myhost, err := p2pimpl.NewHost(&selfPeer, nodePriKey)
if err != nil {
fmt.Fprintf(os.Stderr, "Error :%v \n", err)
os.Exit(1)
}
decider := quorum.NewDecider(
quorum.SuperMajorityVote, uint32(shardID),
)
consensusObj, err := consensus.New(myhost, uint32(shardID), p2p.Peer{}, nil, decider)
chainDBFactory := &shardchain.MemDBFactory{}
txGen := node.New(myhost, consensusObj, chainDBFactory, nil, false) //Changed it : no longer archival node.
txGen.Client = client.NewClient(txGen.GetHost(), uint32(shardID))
consensusObj.ChainReader = txGen.Blockchain()
genesisShardingConfig := shard.Schedule.InstanceForEpoch(big.NewInt(core.GenesisEpoch))
startIdx := 0
endIdx := startIdx + genesisShardingConfig.NumNodesPerShard()
pubs := []*bls2.PublicKey{}
for _, acct := range genesis.HarmonyAccounts[startIdx:endIdx] {
pub := &bls2.PublicKey{}
if err := pub.DeserializeHexStr(acct.BLSPublicKey); err != nil {
fmt.Printf("Can not deserialize public key. err: %v", err)
os.Exit(1)
}
pubs = append(pubs, pub)
}
consensusObj.Decider.UpdateParticipants(pubs)
txGen.NodeConfig.SetRole(nodeconfig.Validator)
if shardID == 0 {
txGen.NodeConfig.SetShardGroupID(nodeconfig.GroupIDBeacon)
} else {
txGen.NodeConfig.SetShardGroupID(nodeconfig.NewGroupIDByShardID(nodeconfig.ShardID(shardID)))
}
txGen.NodeConfig.SetIsClient(true)
return txGen
}
func main() {
flag.Var(&p2putils.BootNodes, "bootnodes", "a list of bootnode multiaddress")
flag.Parse()
if *versionFlag {
printVersion(os.Args[0])
}
// Logging setup
utils.SetLogContext(*port, *ip)
utils.SetLogVerbosity(log.Lvl(*verbosity))
if len(p2putils.BootNodes) == 0 {
bootNodeAddrs, err := p2putils.StringsToAddrs(p2putils.DefaultBootNodeAddrStrings)
if err != nil {
utils.FatalErrMsg(err, "cannot parse default bootnode list")
}
p2putils.BootNodes = bootNodeAddrs
}
// Init with LibP2P enabled, FIXME: (leochen) right now we support only one shard
setting := Settings{
NumOfAddress: 10000,
MaxNumTxsPerBatch: *numTxns,
}
shardID := *shardIDFlag
utils.Logger().Debug().
Int("cx ratio", *crossShardRatio).
Msg("Cross Shard Ratio Is Set But not used")
// TODO(Richard): refactor this chuck to a single method
// Setup a logger to stdout and log file.
logFileName := fmt.Sprintf("./%v/txgen.log", *logFolder)
h := log.MultiHandler(
log.StreamHandler(os.Stdout, log.TerminalFormat(false)),
log.Must.FileHandler(logFileName, log.LogfmtFormat()), // Log to file
)
log.Root().SetHandler(h)
txGen := setUpTXGen()
txGen.ServiceManagerSetup()
txGen.RunServices()
start := time.Now()
totalTime := float64(*duration)
utils.Logger().Debug().
Float64("totalTime", totalTime).
Bool("RunForever", isDurationForever(totalTime)).
Msg("Total Duration")
ticker := time.NewTicker(checkFrequency * time.Second)
txGen.DoSyncWithoutConsensus()
syncLoop:
for {
t := time.Now()
if totalTime > 0 && t.Sub(start).Seconds() >= totalTime {
utils.Logger().Debug().
Int("duration", (int(t.Sub(start)))).
Time("startTime", start).
Float64("totalTime", totalTime).
Msg("Generator timer ended in syncLoop.")
break syncLoop
}
select {
case <-ticker.C:
if txGen.State.String() == "NodeReadyForConsensus" {
utils.Logger().Debug().
Str("txgen node", txGen.SelfPeer.String()).
Str("Node State", txGen.State.String()).
Msg("Generator is now in Sync.")
ticker.Stop()
break syncLoop
}
}
}
readySignal := make(chan uint32)
// This func is used to update the client's blockchain when new blocks are received from the leaders
updateBlocksFunc := func(blocks []*types.Block) {
utils.Logger().Info().
Uint64("block num", blocks[0].NumberU64()).
Msg("[Txgen] Received new block")
for _, block := range blocks {
shardID := block.ShardID()
if txGen.Consensus.ShardID == shardID {
utils.Logger().Info().
Int("txNum", len(block.Transactions())).
Uint32("shardID", shardID).
Str("preHash", block.ParentHash().Hex()).
Uint64("currentBlock", txGen.Blockchain().CurrentBlock().NumberU64()).
Uint64("incoming block", block.NumberU64()).
Msg("Got block from leader")
if block.NumberU64()-txGen.Blockchain().CurrentBlock().NumberU64() == 1 {
if _, err := txGen.Blockchain().InsertChain([]*types.Block{block}, true); err != nil {
utils.Logger().Error().
Err(err).
Msg("Error when adding new block")
}
stateMutex.Lock()
if err := txGen.Worker.UpdateCurrent(); err != nil {
utils.Logger().Warn().Err(err).Msg("(*Worker).UpdateCurrent failed")
}
stateMutex.Unlock()
readySignal <- shardID
}
} else {
continue
}
}
}
txGen.Client.UpdateBlocks = updateBlocksFunc
// Start the client server to listen to leader's message
go func() {
// wait for 3 seconds for client to send ping message to leader
// FIXME (leo) the readySignal should be set once we really sent ping message to leader
time.Sleep(1 * time.Second) // wait for nodes to be ready
readySignal <- uint32(shardID)
}()
pushLoop:
for {
t := time.Now()
utils.Logger().Debug().
Float64("running time", t.Sub(start).Seconds()).
Float64("totalTime", totalTime).
Msg("Current running time")
if !isDurationForever(totalTime) && t.Sub(start).Seconds() >= totalTime {
utils.Logger().Debug().
Int("duration", (int(t.Sub(start)))).
Time("startTime", start).
Float64("totalTime", totalTime).
Msg("Generator timer ended.")
break pushLoop
}
if shardID != 0 {
if otherHeight, flag := txGen.IsSameHeight(); flag {
if otherHeight >= 1 {
go func() {
readySignal <- uint32(shardID)
utils.Logger().Debug().Msg("Same blockchain height so readySignal generated")
time.Sleep(3 * time.Second) // wait for nodes to be ready
}()
}
}
}
select {
case shardID := <-readySignal:
lock := sync.Mutex{}
txs, err := GenerateSimulatedTransactionsAccount(uint32(shardID), txGen, setting)
if err != nil {
utils.Logger().Debug().
Err(err).
Msg("Error in Generating Txns")
}
lock.Lock()
SendTxsToShard(txGen, txs, uint32(shardID))
lock.Unlock()
case <-time.After(10 * time.Second):
utils.Logger().Warn().Msg("No new block is received so far")
}
}
}
// SendTxsToShard sends txs to shard, currently just to beacon shard
func SendTxsToShard(clientNode *node.Node, txs types.Transactions, shardID uint32) {
msg := proto_node.ConstructTransactionListMessageAccount(txs)
var err error
if shardID == 0 {
err = clientNode.GetHost().SendMessageToGroups([]nodeconfig.GroupID{nodeconfig.GroupIDBeaconClient}, p2p_host.ConstructP2pMessage(byte(0), msg))
} else {
clientGroup := nodeconfig.NewClientGroupIDByShardID(nodeconfig.ShardID(shardID))
err = clientNode.GetHost().SendMessageToGroups([]nodeconfig.GroupID{clientGroup}, p2p_host.ConstructP2pMessage(byte(0), msg))
}
if err != nil {
utils.Logger().Debug().
Err(err).
Msg("Error in Sending Txns")
}
}
// GenerateSimulatedTransactionsAccount generates simulated transaction for account model.
func GenerateSimulatedTransactionsAccount(shardID uint32, node *node.Node, setting Settings) (types.Transactions, error) {
TxnsToGenerate := setting.MaxNumTxsPerBatch // TODO: make use of settings
txs := make([]*types.Transaction, TxnsToGenerate)
rounds := (TxnsToGenerate / 100)
remainder := TxnsToGenerate % 100
for i := 0; i < 100; i++ {
baseNonce := node.Worker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(node.TestBankKeys[i].PublicKey))
for j := 0; j < rounds; j++ {
randomUserAddress := crypto.PubkeyToAddress(node.TestBankKeys[rand.Intn(100)].PublicKey)
randAmount := rand.Float32()
tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(j), randomUserAddress, shardID, big.NewInt(int64(denominations.One*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i])
txs[100*j+i] = tx
}
if i < remainder {
randomUserAddress := crypto.PubkeyToAddress(node.TestBankKeys[rand.Intn(100)].PublicKey)
randAmount := rand.Float32()
tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(rounds), randomUserAddress, shardID, big.NewInt(int64(denominations.One*randAmount)), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i])
txs[100*rounds+i] = tx
}
}
return txs, nil
}
func isDurationForever(duration float64) bool {
return duration <= 0
}

@ -426,17 +426,13 @@ func (node *Node) startRxPipeline(
// StartServer starts a server and process the requests by a handler.
func (node *Node) StartServer() {
// client messages are sent by clients, like txgen, wallet
// client messages are for just spectators, like plain observers
node.startRxPipeline(node.clientReceiver, node.clientRxQueue, ClientRxWorkers)
// start the goroutine to receive group message
// start the goroutine to receive in my subcommittee messages
node.startRxPipeline(node.shardGroupReceiver, node.shardRxQueue, ShardRxWorkers)
// start the goroutine to receive global message, used for cross-shard TX
// start the goroutine to receive supercommittee level messages
// FIXME (leo): we use beacon client topic as the global topic for now
node.startRxPipeline(node.globalGroupReceiver, node.globalRxQueue, GlobalRxWorkers)
select {}
}

@ -4,7 +4,6 @@ export GO111MODULE=on
declare -A SRC
SRC[harmony]=cmd/harmony/main.go
# SRC[txgen]=cmd/client/txgen/main.go
SRC[bootnode]=cmd/bootnode/main.go
BINDIR=bin
@ -69,7 +68,7 @@ ACTION:
upload upload binaries to s3
release upload binaries to release bucket
harmony|txgen|bootnode|
harmony|bootnode|
only build the specified binary
EXAMPLES:
@ -274,6 +273,6 @@ case "$ACTION" in
"build") build_only ;;
"upload") upload ;;
"release") release ;;
"harmony"|"txgen"|"bootnode") build_only $ACTION ;;
"harmony"|"bootnode") build_only $ACTION ;;
*) usage ;;
esac

@ -78,7 +78,7 @@ USAGE: $ME [OPTIONS] config_file_name [extra args to node]
-N network network type (default: $NETWORK)
-B don't build the binary
This script will build all the binaries and start harmony and txgen based on the configuration file.
This script will build all the binaries and start harmony and based on the configuration file.
EXAMPLES:

@ -19,7 +19,7 @@ function check_result() {
}
function cleanup() {
for pid in `/bin/ps -fu $USER| grep "harmony\|txgen\|soldier\|commander\|profiler\|beacon\|bootnode" | grep -v "grep" | grep -v "vi" | awk '{print $2}'`;
for pid in `/bin/ps -fu $USER| grep "harmony\|soldier\|commander\|profiler\|beacon\|bootnode" | grep -v "grep" | grep -v "vi" | awk '{print $2}'`;
do
echo 'Killed process: '$pid
$DRYRUN kill -9 $pid 2> /dev/null
@ -50,15 +50,13 @@ USAGE: $ME [OPTIONS] config_file_name
-h print this help message
-d enable db support (default: $DB)
-t toggle txgen (default: $TXGEN)
-D duration txgen run duration (default: $DURATION)
-m min_peers minimal number of peers to start consensus (default: $MIN)
-s shards number of shards (default: $SHARDS)
-k nodeport kill the node with specified port number (default: $KILLPORT)
-n dryrun mode (default: $DRYRUN)
-S enable sync test (default: $SYNC)
This script will build all the binaries and start harmony and txgen based on the configuration file.
This script will build all the binaries and start harmony and based on the configuration file.
EXAMPLES:
@ -70,20 +68,16 @@ EOU
}
DB=
TXGEN=true
DURATION=90
MIN=5
SHARDS=2
KILLPORT=9004
SYNC=true
DRYRUN=
while getopts "hdtD:m:s:k:nSP" option; do
while getopts "hd:m:s:k:nSP" option; do
case $option in
h) usage ;;
d) DB='-db_supported' ;;
t) TXGEN=false ;;
D) DURATION=$OPTARG ;;
m) MIN=$OPTARG ;;
s) SHARDS=$OPTARG ;;
k) KILLPORT=$OPTARG ;;

@ -1,3 +1,3 @@
#!/bin/bash
pkill -9 '^(harmony|txgen|soldier|commander|profiler|bootnode)$' | sed 's/^/Killed process: /'
pkill -9 '^(harmony|soldier|commander|profiler|bootnode)$' | sed 's/^/Killed process: /'
rm -rf db-127.0.0.1-*

Loading…
Cancel
Save