package main
import (
"flag"
"fmt"
"math/big"
"math/rand"
"os"
"path"
"runtime"
"sync"
"time"
"github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/core"
"github.com/harmony-one/harmony/internal/ctxerror"
"github.com/harmony-one/harmony/internal/shardchain"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/params"
bls2 "github.com/harmony-one/bls/ffi/go/bls"
"github.com/harmony-one/harmony/api/client"
proto_node "github.com/harmony-one/harmony/api/proto/node"
"github.com/harmony-one/harmony/common/denominations"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/bls"
nodeconfig "github.com/harmony-one/harmony/internal/configs/node"
"github.com/harmony-one/harmony/internal/genesis"
"github.com/harmony-one/harmony/internal/utils"
"github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p"
p2p_host "github.com/harmony-one/harmony/p2p/host"
"github.com/harmony-one/harmony/p2p/p2pimpl"
)
var (
version string
builtBy string
builtAt string
commit string
stateMutex sync . Mutex
)
const (
checkFrequency = 2 //checkfrequency checks whether the transaction generator is ready to send the next batch of transactions.
)
// Settings is the settings for TX generation. No Cross-Shard Support!
type Settings struct {
NumOfAddress int
MaxNumTxsPerBatch int
}
func printVersion ( me string ) {
fmt . Fprintf ( os . Stderr , "Harmony (C) 2018. %v, version %v-%v (%v %v)\n" , path . Base ( me ) , version , commit , builtBy , builtAt )
os . Exit ( 0 )
}
// The main entrance for the transaction generator program which simulate transactions and send to the network for
// processing.
var (
ip = flag . String ( "ip" , "127.0.0.1" , "IP of the node" )
port = flag . String ( "port" , "9999" , "port of the node." )
numTxns = flag . Int ( "numTxns" , 100 , "number of transactions to send per message" )
logFolder = flag . String ( "log_folder" , "latest" , "the folder collecting the logs of this execution" )
duration = flag . Int ( "duration" , 30 , "duration of the tx generation in second. If it's negative, the experiment runs forever." )
versionFlag = flag . Bool ( "version" , false , "Output version info" )
crossShardRatio = flag . Int ( "cross_shard_ratio" , 30 , "The percentage of cross shard transactions." ) //Keeping this for backward compatibility
shardIDFlag = flag . Int ( "shardID" , 0 , "The shardID the node belongs to." )
// Key file to store the private key
keyFile = flag . String ( "key" , "./.txgenkey" , "the private key file of the txgen" )
// logging verbosity
verbosity = flag . Int ( "verbosity" , 5 , "Logging verbosity: 0=silent, 1=error, 2=warn, 3=info, 4=debug, 5=detail (default: 5)" )
)
func setUpTXGen ( ) * node . Node {
nodePriKey , _ , err := utils . LoadKeyFromFile ( * keyFile )
if err != nil {
panic ( err )
}
peerPubKey := bls . RandPrivateKey ( ) . GetPublicKey ( )
if peerPubKey == nil {
panic ( fmt . Errorf ( "generate key error" ) )
}
shardID := * shardIDFlag
selfPeer := p2p . Peer { IP : * ip , Port : * port , ConsensusPubKey : peerPubKey }
gsif , err := consensus . NewGenesisStakeInfoFinder ( )
// Nodes containing blockchain data to mirror the shards' data in the network
myhost , err := p2pimpl . NewHost ( & selfPeer , nodePriKey )
if err != nil {
panic ( "unable to new host in txgen" )
}
if err != nil {
fmt . Fprintf ( os . Stderr , "Error :%v \n" , err )
os . Exit ( 1 )
}
consensusObj , err := consensus . New ( myhost , uint32 ( shardID ) , p2p . Peer { } , nil )
chainDBFactory := & shardchain . MemDBFactory { }
txGen := node . New ( myhost , consensusObj , chainDBFactory , false ) //Changed it : no longer archival node.
txGen . Client = client . NewClient ( txGen . GetHost ( ) , uint32 ( shardID ) )
consensusObj . SetStakeInfoFinder ( gsif )
consensusObj . ChainReader = txGen . Blockchain ( )
consensusObj . PublicKeys = nil
startIdx := 0
endIdx := startIdx + core . GenesisShardSize
for _ , acct := range genesis . GenesisAccounts [ startIdx : endIdx ] {
pub := & bls2 . PublicKey { }
if err := pub . DeserializeHexStr ( acct . BlsPublicKey ) ; err != nil {
fmt . Printf ( "Can not deserialize public key. err: %v" , err )
os . Exit ( 1 )
}
consensusObj . PublicKeys = append ( consensusObj . PublicKeys , pub )
}
txGen . NodeConfig . SetRole ( nodeconfig . ClientNode )
if shardID == 0 {
txGen . NodeConfig . SetIsBeacon ( true )
txGen . NodeConfig . SetShardGroupID ( p2p . GroupIDBeacon )
} else {
txGen . NodeConfig . SetShardGroupID ( p2p . NewGroupIDByShardID ( p2p . ShardID ( shardID ) ) )
}
txGen . NodeConfig . SetIsClient ( true )
return txGen
}
func main ( ) {
flag . Var ( & utils . BootNodes , "bootnodes" , "a list of bootnode multiaddress" )
flag . Parse ( )
if * versionFlag {
printVersion ( os . Args [ 0 ] )
}
// Add GOMAXPROCS to achieve max performance.
runtime . GOMAXPROCS ( 1024 )
// Logging setup
utils . SetLogContext ( * port , * ip )
utils . SetLogVerbosity ( log . Lvl ( * verbosity ) )
if len ( utils . BootNodes ) == 0 {
bootNodeAddrs , err := utils . StringsToAddrs ( utils . DefaultBootNodeAddrStrings )
if err != nil {
panic ( err )
}
utils . BootNodes = bootNodeAddrs
}
// Init with LibP2P enabled, FIXME: (leochen) right now we support only one shard
setting := Settings {
NumOfAddress : 10000 ,
MaxNumTxsPerBatch : * numTxns ,
}
shardID := * shardIDFlag
utils . GetLogInstance ( ) . Debug ( "Cross Shard Ratio Is Set But not used" , "cx ratio" , * crossShardRatio )
// TODO(Richard): refactor this chuck to a single method
// Setup a logger to stdout and log file.
logFileName := fmt . Sprintf ( "./%v/txgen.log" , * logFolder )
h := log . MultiHandler (
log . StreamHandler ( os . Stdout , log . TerminalFormat ( false ) ) ,
log . Must . FileHandler ( logFileName , log . LogfmtFormat ( ) ) , // Log to file
)
log . Root ( ) . SetHandler ( h )
txGen := setUpTXGen ( )
txGen . ServiceManagerSetup ( )
txGen . RunServices ( )
start := time . Now ( )
totalTime := float64 ( * duration )
utils . GetLogInstance ( ) . Debug ( "Total Duration" , "totalTime" , totalTime , "RunForever" , isDurationForever ( totalTime ) )
ticker := time . NewTicker ( checkFrequency * time . Second )
txGen . DoSyncWithoutConsensus ( )
syncLoop :
for {
t := time . Now ( )
if totalTime > 0 && t . Sub ( start ) . Seconds ( ) >= totalTime {
utils . GetLogInstance ( ) . Debug ( "Generator timer ended in syncLoop." , "duration" , ( int ( t . Sub ( start ) ) ) , "startTime" , start , "totalTime" , totalTime )
break syncLoop
}
select {
case <- ticker . C :
if txGen . State . String ( ) == "NodeReadyForConsensus" {
utils . GetLogInstance ( ) . Debug ( "Generator is now in Sync." , "txgen node" , txGen . SelfPeer , "Node State" , txGen . State . String ( ) )
ticker . Stop ( )
break syncLoop
}
}
}
readySignal := make ( chan uint32 )
// This func is used to update the client's blockchain when new blocks are received from the leaders
updateBlocksFunc := func ( blocks [ ] * types . Block ) {
utils . GetLogInstance ( ) . Info ( "[Txgen] Received new block" , "block num" , blocks [ 0 ] . NumberU64 ( ) )
for _ , block := range blocks {
shardID := block . ShardID ( )
if txGen . Consensus . ShardID == shardID {
utils . GetLogInstance ( ) . Info ( "Got block from leader" , "txNum" , len ( block . Transactions ( ) ) , "shardID" , shardID , "preHash" , block . ParentHash ( ) . Hex ( ) , "currentBlock" , txGen . Blockchain ( ) . CurrentBlock ( ) . NumberU64 ( ) , "incoming block" , block . NumberU64 ( ) )
if block . NumberU64 ( ) - txGen . Blockchain ( ) . CurrentBlock ( ) . NumberU64 ( ) == 1 {
txGen . AddNewBlock ( block )
stateMutex . Lock ( )
if err := txGen . Worker . UpdateCurrent ( ) ; err != nil {
ctxerror . Warn ( utils . GetLogger ( ) , err ,
"(*Worker).UpdateCurrent failed" )
}
stateMutex . Unlock ( )
readySignal <- shardID
}
} else {
continue
}
}
}
txGen . Client . UpdateBlocks = updateBlocksFunc
// Start the client server to listen to leader's message
go func ( ) {
// wait for 3 seconds for client to send ping message to leader
// FIXME (leo) the readySignal should be set once we really sent ping message to leader
time . Sleep ( 1 * time . Second ) // wait for nodes to be ready
readySignal <- uint32 ( shardID )
} ( )
pushLoop :
for {
t := time . Now ( )
utils . GetLogInstance ( ) . Debug ( "Current running time" , "running time" , t . Sub ( start ) . Seconds ( ) , "totaltime" , totalTime )
if ! isDurationForever ( totalTime ) && t . Sub ( start ) . Seconds ( ) >= totalTime {
utils . GetLogInstance ( ) . Debug ( "Generator timer ended." , "duration" , ( int ( t . Sub ( start ) ) ) , "startTime" , start , "totalTime" , totalTime )
break pushLoop
}
if shardID != 0 {
if otherHeight , flag := txGen . IsSameHeight ( ) ; flag {
if otherHeight >= 1 {
go func ( ) {
readySignal <- uint32 ( shardID )
utils . GetLogInstance ( ) . Debug ( "Same blockchain height so readySignal generated" )
time . Sleep ( 3 * time . Second ) // wait for nodes to be ready
} ( )
}
}
}
select {
case shardID := <- readySignal :
lock := sync . Mutex { }
utils . GetLogInstance ( ) . Warn ( "STARTING TX GEN PUSH LOOP" , "gomaxprocs" , runtime . GOMAXPROCS ( 0 ) )
txs , err := GenerateSimulatedTransactionsAccount ( uint32 ( shardID ) , txGen , setting )
if err != nil {
utils . GetLogInstance ( ) . Debug ( "Error in Generating Txns" , "Err" , err )
}
lock . Lock ( )
SendTxsToShard ( txGen , txs , uint32 ( shardID ) )
lock . Unlock ( )
case <- time . After ( 10 * time . Second ) :
utils . GetLogInstance ( ) . Warn ( "No new block is received so far" )
}
}
}
// SendTxsToShard sends txs to shard, currently just to beacon shard
func SendTxsToShard ( clientNode * node . Node , txs types . Transactions , shardID uint32 ) {
msg := proto_node . ConstructTransactionListMessageAccount ( txs )
var err error
if shardID == 0 {
err = clientNode . GetHost ( ) . SendMessageToGroups ( [ ] p2p . GroupID { p2p . GroupIDBeaconClient } , p2p_host . ConstructP2pMessage ( byte ( 0 ) , msg ) )
} else {
clientGroup := p2p . NewClientGroupIDByShardID ( p2p . ShardID ( shardID ) )
err = clientNode . GetHost ( ) . SendMessageToGroups ( [ ] p2p . GroupID { clientGroup } , p2p_host . ConstructP2pMessage ( byte ( 0 ) , msg ) )
}
if err != nil {
utils . GetLogInstance ( ) . Debug ( "Error in Sending Txns" , "Err" , err )
}
}
// GenerateSimulatedTransactionsAccount generates simulated transaction for account model.
func GenerateSimulatedTransactionsAccount ( shardID uint32 , node * node . Node , setting Settings ) ( types . Transactions , error ) {
TxnsToGenerate := setting . MaxNumTxsPerBatch // TODO: make use of settings
txs := make ( [ ] * types . Transaction , TxnsToGenerate )
rounds := ( TxnsToGenerate / 100 )
remainder := TxnsToGenerate % 100
for i := 0 ; i < 100 ; i ++ {
baseNonce := node . Worker . GetCurrentState ( ) . GetNonce ( crypto . PubkeyToAddress ( node . TestBankKeys [ i ] . PublicKey ) )
for j := 0 ; j < rounds ; j ++ {
randomUserAddress := crypto . PubkeyToAddress ( node . TestBankKeys [ rand . Intn ( 100 ) ] . PublicKey )
randAmount := rand . Float32 ( )
tx , _ := types . SignTx ( types . NewTransaction ( baseNonce + uint64 ( j ) , randomUserAddress , shardID , big . NewInt ( int64 ( denominations . One * randAmount ) ) , params . TxGas , nil , nil ) , types . HomesteadSigner { } , node . TestBankKeys [ i ] )
txs [ 100 * j + i ] = tx
}
if i < remainder {
randomUserAddress := crypto . PubkeyToAddress ( node . TestBankKeys [ rand . Intn ( 100 ) ] . PublicKey )
randAmount := rand . Float32 ( )
tx , _ := types . SignTx ( types . NewTransaction ( baseNonce + uint64 ( rounds ) , randomUserAddress , shardID , big . NewInt ( int64 ( denominations . One * randAmount ) ) , params . TxGas , nil , nil ) , types . HomesteadSigner { } , node . TestBankKeys [ i ] )
txs [ 100 * rounds + i ] = tx
}
}
return txs , nil
}
func isDurationForever ( duration float64 ) bool {
return duration <= 0
}