package main
import (
"flag"
"fmt"
"math/big"
"math/rand"
"os"
"path"
"runtime"
"sync"
"time"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
bls2 "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/client"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/internal/utils/contract"
"github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p"
p2p_host "github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/p2pimpl"
)
var (
version string
builtBy string
builtAt string
commit string
stateMutex sync . Mutex
)
const (
checkFrequency = 5 //checkfrequency checks whether the transaction generator is ready to send the next batch of transactions.
)
// Settings is the settings for TX generation. No Cross-Shard Support!
type Settings struct {
NumOfAddress int
MaxNumTxsPerBatch int
}
func printVersion ( me string ) {
fmt . Fprintf ( os . Stderr , "Harmony (C) 2018. %v, version %v-%v (%v %v)\n" , path . Base ( me ) , version , commit , builtBy , builtAt )
os . Exit ( 0 )
}
// The main entrance for the transaction generator program which simulate transactions and send to the network for
// processing.
var (
ip = flag . String ( "ip" , "127.0.0.1" , "IP of the node" )
port = flag . String ( "port" , "9999" , "port of the node." )
maxNumTxsPerBatch = flag . Int ( "max_num_txs_per_batch" , 20000 , "number of transactions to send per message" )
logFolder = flag . String ( "log_folder" , "latest" , "the folder collecting the logs of this execution" )
duration = flag . Int ( "duration" , 10 , "duration of the tx generation in second. If it's negative, the experiment runs forever." )
versionFlag = flag . Bool ( "version" , false , "Output version info" )
crossShardRatio = flag . Int ( "cross_shard_ratio" , 30 , "The percentage of cross shard transactions." ) //Keeping this for backward compatibility
shardIDFlag = flag . Int ( "shardID" , 0 , "The shardID the node belongs to." )
// Key file to store the private key
keyFile = flag . String ( "key" , "./.txgenkey" , "the private key file of the txgen" )
)
func setUpTXGen ( ) * node . Node {
nodePriKey , _ , err := utils . LoadKeyFromFile ( * keyFile )
if err != nil {
panic ( err )
}
peerPubKey := bls . RandPrivateKey ( ) . GetPublicKey ( )
if peerPubKey == nil {
panic ( fmt . Errorf ( "generate key error" ) )
}
shardID := * shardIDFlag
var shardIDs [ ] uint32
shardIDs = append ( shardIDs , uint32 ( shardID ) )
selfPeer := p2p . Peer { IP : * ip , Port : * port , ConsensusPubKey : peerPubKey }
gsif , err := consensus . NewGenesisStakeInfoFinder ( )
// Nodes containing blockchain data to mirror the shards' data in the network
myhost , err := p2pimpl . NewHost ( & selfPeer , nodePriKey )
if err != nil {
panic ( "unable to new host in txgen" )
}
if err != nil {
fmt . Fprintf ( os . Stderr , "Error :%v \n" , err )
os . Exit ( 1 )
}
consensusObj , err := consensus . New ( myhost , uint32 ( shardID ) , p2p . Peer { } , nil )
txGen := node . New ( myhost , consensusObj , nil , false ) //Changed it : no longer archival node.
txGen . Client = client . NewClient ( txGen . GetHost ( ) , shardIDs )
consensusObj . SetStakeInfoFinder ( gsif )
consensusObj . ChainReader = txGen . Blockchain ( )
consensusObj . PublicKeys = nil
startIdx := 0
endIdx := startIdx + core . GenesisShardSize
for _ , acct := range contract . GenesisBLSAccounts [ startIdx : endIdx ] {
secretKey := bls2 . SecretKey { }
if err := secretKey . SetHexString ( acct . Private ) ; err != nil {
_ , _ = fmt . Fprintf ( os . Stderr , "cannot parse secret key: %v\n" ,
err )
os . Exit ( 1 )
}
consensusObj . PublicKeys = append ( consensusObj . PublicKeys , secretKey . GetPublicKey ( ) )
}
txGen . NodeConfig . SetRole ( nodeconfig . ClientNode )
txGen . NodeConfig . SetIsBeacon ( true )
txGen . NodeConfig . SetIsClient ( true )
txGen . NodeConfig . SetShardGroupID ( p2p . GroupIDBeacon )
return txGen
}
func main ( ) {
flag . Var ( & utils . BootNodes , "bootnodes" , "a list of bootnode multiaddress" )
flag . Parse ( )
if * versionFlag {
printVersion ( os . Args [ 0 ] )
}
// Add GOMAXPROCS to achieve max performance.
runtime . GOMAXPROCS ( 1024 )
// Logging setup
utils . SetPortAndIP ( * port , * ip )
if len ( utils . BootNodes ) == 0 {
bootNodeAddrs , err := utils . StringsToAddrs ( utils . DefaultBootNodeAddrStrings )
if err != nil {
panic ( err )
}
utils . BootNodes = bootNodeAddrs
}
// Init with LibP2P enabled, FIXME: (leochen) right now we support only one shard
setting := Settings {
NumOfAddress : 10000 ,
MaxNumTxsPerBatch : * maxNumTxsPerBatch ,
}
utils . GetLogInstance ( ) . Debug ( "Cross Shard Ratio Is Set But not used" , "cx ratio" , * crossShardRatio )
// TODO(Richard): refactor this chuck to a single method
// Setup a logger to stdout and log file.
logFileName := fmt . Sprintf ( "./%v/txgen.log" , * logFolder )
h := log . MultiHandler (
log . StreamHandler ( os . Stdout , log . TerminalFormat ( false ) ) ,
log . Must . FileHandler ( logFileName , log . LogfmtFormat ( ) ) , // Log to file
)
log . Root ( ) . SetHandler ( h )
txGen := setUpTXGen ( )
txGen . ServiceManagerSetup ( )
txGen . RunServices ( )
time . Sleep ( 20 * time . Second )
start := time . Now ( )
totalTime := float64 ( * duration )
ticker := time . NewTicker ( checkFrequency * time . Second )
txGen . DoSyncWithoutConsensus ( )
syncLoop :
for {
t := time . Now ( )
if totalTime > 0 && t . Sub ( start ) . Seconds ( ) >= totalTime {
utils . GetLogInstance ( ) . Debug ( "Generator timer ended in syncLoop." , "duration" , ( int ( t . Sub ( start ) ) ) , "startTime" , start , "totalTime" , totalTime )
break syncLoop
}
select {
case <- ticker . C :
if txGen . State . String ( ) == "NodeReadyForConsensus" {
utils . GetLogInstance ( ) . Debug ( "Generator is now in Sync." , "txgen node" , txGen . SelfPeer , "Node State" , txGen . State . String ( ) )
ticker . Stop ( )
break syncLoop
}
}
}
readySignal := make ( chan uint32 )
// This func is used to update the client's blockchain when new blocks are received from the leaders
updateBlocksFunc := func ( blocks [ ] * types . Block ) {
utils . GetLogInstance ( ) . Info ( "[Txgen] Received new block" , "block num" , blocks [ 0 ] . NumberU64 ( ) )
for _ , block := range blocks {
shardID := block . ShardID ( )
if txGen . Consensus . ShardID == shardID {
utils . GetLogInstance ( ) . Info ( "Adding block from leader" , "txNum" , len ( block . Transactions ( ) ) , "shardID" , shardID , "preHash" , block . ParentHash ( ) . Hex ( ) , "currentBlock" , txGen . Blockchain ( ) . CurrentBlock ( ) . NumberU64 ( ) , "incoming block" , block . NumberU64 ( ) )
txGen . AddNewBlock ( block )
stateMutex . Lock ( )
txGen . Worker . UpdateCurrent ( )
stateMutex . Unlock ( )
readySignal <- shardID
} else {
continue
}
}
}
txGen . Client . UpdateBlocks = updateBlocksFunc
// Start the client server to listen to leader's message
go func ( ) {
// wait for 3 seconds for client to send ping message to leader
// FIXME (leo) the readySignal should be set once we really sent ping message to leader
time . Sleep ( 3 * time . Second ) // wait for nodes to be ready
readySignal <- uint32 ( 0 )
} ( )
for {
t := time . Now ( )
if totalTime > 0 && t . Sub ( start ) . Seconds ( ) >= totalTime {
utils . GetLogInstance ( ) . Debug ( "Generator timer ended." , "duration" , ( int ( t . Sub ( start ) ) ) , "startTime" , start , "totalTime" , totalTime )
break
}
select {
case shardID := <- readySignal :
lock := sync . Mutex { }
utils . GetLogInstance ( ) . Warn ( "STARTING TX GEN PUSH LOOP" , "gomaxprocs" , runtime . GOMAXPROCS ( 0 ) )
txs , err := GenerateSimulatedTransactionsAccount ( uint32 ( shardID ) , txGen , setting )
if err != nil {
utils . GetLogInstance ( ) . Debug ( "Error in Generating Txns" , "Err" , err )
}
lock . Lock ( )
SendTxsToShard ( txGen , txs )
lock . Unlock ( )
case <- time . After ( 10 * time . Second ) :
utils . GetLogInstance ( ) . Warn ( "No new block is received so far" )
}
}
// Send a stop message to stop the nodes at the end
msg := proto_node . ConstructStopMessage ( )
txGen . GetHost ( ) . SendMessageToGroups ( [ ] p2p . GroupID { p2p . GroupIDBeaconClient } , p2p_host . ConstructP2pMessage ( byte ( 0 ) , msg ) )
time . Sleep ( 3 * time . Second )
}
// SendTxsToShard sends txs to shard, currently just to beacon shard
func SendTxsToShard ( clientNode * node . Node , txs types . Transactions ) {
msg := proto_node . ConstructTransactionListMessageAccount ( txs )
err := clientNode . GetHost ( ) . SendMessageToGroups ( [ ] p2p . GroupID { p2p . GroupIDBeaconClient } , p2p_host . ConstructP2pMessage ( byte ( 0 ) , msg ) )
if err != nil {
utils . GetLogInstance ( ) . Debug ( "Error in Sending Txns" , "Err" , err )
}
}
// GenerateSimulatedTransactionsAccount generates simulated transaction for account model.
func GenerateSimulatedTransactionsAccount ( shardID uint32 , node * node . Node , setting Settings ) ( types . Transactions , error ) {
_ = setting // TODO: make use of settings
txs := make ( [ ] * types . Transaction , 100 )
for i := 0 ; i < 100 ; i ++ {
baseNonce := node . Worker . GetCurrentState ( ) . GetNonce ( crypto . PubkeyToAddress ( node . TestBankKeys [ i ] . PublicKey ) )
randomUserAddress := crypto . PubkeyToAddress ( node . TestBankKeys [ rand . Intn ( 100 ) ] . PublicKey )
randAmount := rand . Float32 ( )
tx , _ := types . SignTx ( types . NewTransaction ( baseNonce + uint64 ( 0 ) , randomUserAddress , shardID , big . NewInt ( int64 ( params . Ether * randAmount ) ) , params . TxGas , nil , nil ) , types . HomesteadSigner { } , node . TestBankKeys [ i ] )
txs [ i ] = tx
}
return txs , nil
}