Merge branch 'master' into sync

pull/69/head
Minh Doan 6 years ago
commit bff307484d
  1. 10
      benchmark.go
  2. 54
      blockchain/utxopool.go
  3. 4
      blockchain/utxopool_test.go
  4. 125
      client/txgen/main.go
  5. 72
      consensus/consensus_leader.go
  6. 3
      node/node.go
  7. 4
      node/node_handler.go
  8. 48
      profiler/profiler.go

@ -50,6 +50,7 @@ func main() {
attackedMode := flag.Int("attacked_mode", 0, "0 means not attacked, 1 means attacked, 2 means being open to be selected as attacked") attackedMode := flag.Int("attacked_mode", 0, "0 means not attacked, 1 means attacked, 2 means being open to be selected as attacked")
dbSupported := flag.Bool("db_supported", false, "false means not db_supported, true means db_supported") dbSupported := flag.Bool("db_supported", false, "false means not db_supported, true means db_supported")
profile := flag.Bool("profile", false, "Turn on profiling (CPU, Memory).") profile := flag.Bool("profile", false, "Turn on profiling (CPU, Memory).")
metricsReportURL := flag.String("metrics_profile_url", "", "If set, reports metrics to this URL.")
flag.Parse() flag.Parse()
// Set up randomization seed. // Set up randomization seed.
@ -90,9 +91,12 @@ func main() {
consensus := consensus.NewConsensus(*ip, *port, shardID, peers, leader) consensus := consensus.NewConsensus(*ip, *port, shardID, peers, leader)
// Start Profiler for leader if profile argument is on // Start Profiler for leader if profile argument is on
if *profile && role == "leader" { if role == "leader" && (*profile || *metricsReportURL != "") {
profiler := profiler.NewProfiler(consensus.Log, os.Getpid(), shardID) prof := profiler.GetProfiler()
profiler.Start() prof.Config(consensus.Log, shardID, *metricsReportURL)
if *profile {
prof.Start()
}
} }
// Set logger to attack model. // Set logger to attack model.

@ -4,6 +4,7 @@ import (
"bytes" "bytes"
"encoding/gob" "encoding/gob"
"encoding/hex" "encoding/hex"
"errors"
"fmt" "fmt"
"github.com/dedis/kyber/sign/schnorr" "github.com/dedis/kyber/sign/schnorr"
"github.com/simple-rules/harmony-benchmark/crypto" "github.com/simple-rules/harmony-benchmark/crypto"
@ -77,7 +78,7 @@ func (utxoPool *UTXOPool) VerifyTransactions(transactions []*Transaction) bool {
spentTXOs := make(map[[20]byte]map[string]map[uint32]bool) spentTXOs := make(map[[20]byte]map[string]map[uint32]bool)
if utxoPool != nil { if utxoPool != nil {
for _, tx := range transactions { for _, tx := range transactions {
if valid, crossShard := utxoPool.VerifyOneTransaction(tx, &spentTXOs); !crossShard && !valid { if err, crossShard := utxoPool.VerifyOneTransaction(tx, &spentTXOs); !crossShard && err != nil {
return false return false
} }
} }
@ -114,7 +115,7 @@ func (utxoPool *UTXOPool) VerifyStateBlock(stateBlock *Block) bool {
} }
// VerifyOneTransaction verifies if a list of transactions valid. // VerifyOneTransaction verifies if a list of transactions valid.
func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[[20]byte]map[string]map[uint32]bool) (valid, crossShard bool) { func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[[20]byte]map[string]map[uint32]bool) (err error, crossShard bool) {
if len(tx.Proofs) != 0 { if len(tx.Proofs) != 0 {
return utxoPool.VerifyUnlockTransaction(tx) return utxoPool.VerifyUnlockTransaction(tx)
} }
@ -133,10 +134,10 @@ func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[[
inTxID := hex.EncodeToString(in.PreviousOutPoint.TxID[:]) inTxID := hex.EncodeToString(in.PreviousOutPoint.TxID[:])
index := in.PreviousOutPoint.Index index := in.PreviousOutPoint.Index
// Check if the transaction with the addres is spent or not. // Check if the transaction with the address is spent or not.
if val, ok := (*spentTXOs)[in.Address][inTxID][index]; ok { if val, ok := (*spentTXOs)[in.Address][inTxID][index]; ok {
if val { if val {
return false, crossShard return errors.New("TxInput is already spent"), crossShard
} }
} }
// Mark the transactions with the address and index spent. // Mark the transactions with the address and index spent.
@ -154,7 +155,7 @@ func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[[
inTotal += val inTotal += val
} else { } else {
utxoPool.mutex.Unlock() utxoPool.mutex.Unlock()
return false, crossShard return errors.New("Specified TxInput does not exist in utxo pool"), crossShard
} }
utxoPool.mutex.Unlock() utxoPool.mutex.Unlock()
} }
@ -171,30 +172,30 @@ func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[[
// TODO: improve this checking logic // TODO: improve this checking logic
if (crossShard && inTotal > outTotal) || (!crossShard && inTotal != outTotal) { if (crossShard && inTotal > outTotal) || (!crossShard && inTotal != outTotal) {
return false, crossShard return errors.New("Input and output amount doesn't match"), crossShard
} }
if inTotal == 0 { if inTotal == 0 {
return false, false // Here crossShard is false, because if there is no business for this shard, it's effectively not crossShard no matter what. return errors.New("Input amount is 0"), false // Here crossShard is false, because if there is no business for this shard, it's effectively not crossShard no matter what.
} }
// Verify the signature // Verify the signature
pubKey := crypto.Ed25519Curve.Point() pubKey := crypto.Ed25519Curve.Point()
err := pubKey.UnmarshalBinary(tx.PublicKey[:]) tempErr := pubKey.UnmarshalBinary(tx.PublicKey[:])
if err != nil { if tempErr != nil {
log.Error("Failed to deserialize public key", "error", err) log.Error("Failed to deserialize public key", "error", tempErr)
} }
err = schnorr.Verify(crypto.Ed25519Curve, pubKey, tx.GetContentToVerify(), tx.Signature[:]) tempErr = schnorr.Verify(crypto.Ed25519Curve, pubKey, tx.GetContentToVerify(), tx.Signature[:])
if err != nil { if tempErr != nil {
log.Error("Failed to verify signature", "error", err, "public key", pubKey, "pubKey in bytes", tx.PublicKey[:]) log.Error("Failed to verify signature", "error", tempErr, "public key", pubKey, "pubKey in bytes", tx.PublicKey[:])
return false, crossShard return errors.New("Invalid signature"), crossShard
} }
return true, crossShard return nil, crossShard
} }
// Verify a cross shard transaction that contains proofs for unlock-to-commit/abort. // Verify a cross shard transaction that contains proofs for unlock-to-commit/abort.
func (utxoPool *UTXOPool) VerifyUnlockTransaction(tx *Transaction) (valid, crossShard bool) { func (utxoPool *UTXOPool) VerifyUnlockTransaction(tx *Transaction) (err error, crossShard bool) {
valid = true err = nil
crossShard = false // unlock transaction is treated as crossShard=false because it will be finalized now (doesn't need more steps) crossShard = false // unlock transaction is treated as crossShard=false because it will be finalized now (doesn't need more steps)
txInputs := make(map[TXInput]bool) txInputs := make(map[TXInput]bool)
for _, curProof := range tx.Proofs { for _, curProof := range tx.Proofs {
@ -205,7 +206,7 @@ func (utxoPool *UTXOPool) VerifyUnlockTransaction(tx *Transaction) (valid, cross
for _, txInput := range tx.TxInput { for _, txInput := range tx.TxInput {
val, ok := txInputs[txInput] val, ok := txInputs[txInput]
if !ok || !val { if !ok || !val {
valid = false err = errors.New("Invalid unlock transaction: not all proofs are provided for tx inputs")
} }
} }
return return
@ -346,7 +347,7 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
// VerifyOneTransactionAndUpdate verifies and update a valid transaction. // VerifyOneTransactionAndUpdate verifies and update a valid transaction.
// Return false if the transaction is not valid. // Return false if the transaction is not valid.
func (utxoPool *UTXOPool) VerifyOneTransactionAndUpdate(tx *Transaction) bool { func (utxoPool *UTXOPool) VerifyOneTransactionAndUpdate(tx *Transaction) bool {
if valid, _ := utxoPool.VerifyOneTransaction(tx, nil); valid { if err, _ := utxoPool.VerifyOneTransaction(tx, nil); err == nil {
utxoPool.UpdateOneTransaction(tx) utxoPool.UpdateOneTransaction(tx)
return true return true
} }
@ -385,29 +386,28 @@ func CreateUTXOPoolFromGenesisBlockChain(bc *Blockchain) *UTXOPool {
} }
// SelectTransactionsForNewBlock returns a list of index of valid transactions for the new block. // SelectTransactionsForNewBlock returns a list of index of valid transactions for the new block.
func (utxoPool *UTXOPool) SelectTransactionsForNewBlock(transactions []*Transaction, maxNumTxs int) ([]*Transaction, []*Transaction, []*CrossShardTxAndProof) { func (utxoPool *UTXOPool) SelectTransactionsForNewBlock(transactions []*Transaction, maxNumTxs int) ([]*Transaction, []*Transaction, []*Transaction, []*CrossShardTxAndProof) {
selected, unselected, crossShardTxs := []*Transaction{}, []*Transaction{}, []*CrossShardTxAndProof{} selected, unselected, invalid, crossShardTxs := []*Transaction{}, []*Transaction{}, []*Transaction{}, []*CrossShardTxAndProof{}
spentTXOs := make(map[[20]byte]map[string]map[uint32]bool) spentTXOs := make(map[[20]byte]map[string]map[uint32]bool)
for _, tx := range transactions { for _, tx := range transactions {
valid, crossShard := utxoPool.VerifyOneTransaction(tx, &spentTXOs) err, crossShard := utxoPool.VerifyOneTransaction(tx, &spentTXOs)
if len(selected) < maxNumTxs { if len(selected) < maxNumTxs {
if valid || crossShard { if err == nil || crossShard {
selected = append(selected, tx) selected = append(selected, tx)
if crossShard { if crossShard {
proof := CrossShardTxProof{Accept: valid, TxID: tx.ID, TxInput: getShardTxInput(tx, utxoPool.ShardID)} proof := CrossShardTxProof{Accept: err == nil, TxID: tx.ID, TxInput: getShardTxInput(tx, utxoPool.ShardID)}
txAndProof := CrossShardTxAndProof{tx, &proof} txAndProof := CrossShardTxAndProof{tx, &proof}
crossShardTxs = append(crossShardTxs, &txAndProof) crossShardTxs = append(crossShardTxs, &txAndProof)
} }
} else { } else {
unselected = append(unselected, tx) invalid = append(invalid, tx)
} }
} else { } else {
// TODO: discard invalid transactions
unselected = append(unselected, tx) unselected = append(unselected, tx)
} }
} }
return selected, unselected, crossShardTxs return selected, unselected, invalid, crossShardTxs
} }
func getShardTxInput(transaction *Transaction, shardID uint32) []TXInput { func getShardTxInput(transaction *Transaction, shardID uint32) []TXInput {

@ -16,7 +16,7 @@ func TestVerifyOneTransactionAndUpdate(t *testing.T) {
t.Error("failed to create a new transaction.") t.Error("failed to create a new transaction.")
} }
if valid, _ := utxoPool.VerifyOneTransaction(tx, nil); !valid { if err, _ := utxoPool.VerifyOneTransaction(tx, nil); err != nil {
t.Error("failed to verify a valid transaction.") t.Error("failed to verify a valid transaction.")
} }
utxoPool.VerifyOneTransactionAndUpdate(tx) utxoPool.VerifyOneTransactionAndUpdate(tx)
@ -35,7 +35,7 @@ func TestVerifyOneTransactionFail(t *testing.T) {
} }
tx.TxInput = append(tx.TxInput, tx.TxInput[0]) tx.TxInput = append(tx.TxInput, tx.TxInput[0])
if valid, _ := utxoPool.VerifyOneTransaction(tx, nil); valid { if err, _ := utxoPool.VerifyOneTransaction(tx, nil); err == nil {
t.Error("Tx with multiple identical TxInput shouldn't be valid") t.Error("Tx with multiple identical TxInput shouldn't be valid")
} }
} }

@ -1,6 +1,7 @@
package main package main
import ( import (
"encoding/binary"
"encoding/hex" "encoding/hex"
"flag" "flag"
"fmt" "fmt"
@ -57,12 +58,13 @@ type TxInfo struct {
// token (1000) to each address in [0 - N). See node.AddTestingAddresses() // token (1000) to each address in [0 - N). See node.AddTestingAddresses()
// //
// Params: // Params:
// subsetId - the which subset of the utxo to work on (used to select addresses)
// shardID - the shardID for current shard // shardID - the shardID for current shard
// dataNodes - nodes containing utxopools of all shards // dataNodes - nodes containing utxopools of all shards
// Returns: // Returns:
// all single-shard txs // all single-shard txs
// all cross-shard txs // all cross-shard txs
func generateSimulatedTransactions(shardID int, dataNodes []*node.Node) ([]*blockchain.Transaction, []*blockchain.Transaction) { func generateSimulatedTransactions(subsetId, numSubset int, shardID int, dataNodes []*node.Node) ([]*blockchain.Transaction, []*blockchain.Transaction) {
/* /*
UTXO map structure: UTXO map structure:
address - [ address - [
@ -86,33 +88,32 @@ func generateSimulatedTransactions(shardID int, dataNodes []*node.Node) ([]*bloc
UTXOLOOP: UTXOLOOP:
// Loop over all addresses // Loop over all addresses
for address, txMap := range dataNodes[shardID].UtxoPool.UtxoMap { for address, txMap := range dataNodes[shardID].UtxoPool.UtxoMap {
txInfo.address = address if int(binary.BigEndian.Uint32(address[:]))%numSubset == subsetId%numSubset { // Work on one subset of utxo at a time
// Loop over all txIds for the address txInfo.address = address
for txIdStr, utxoMap := range txMap { // Loop over all txIds for the address
// Parse TxId for txIdStr, utxoMap := range txMap {
id, err := hex.DecodeString(txIdStr) // Parse TxId
if err != nil { id, err := hex.DecodeString(txIdStr)
continue if err != nil {
}
copy(txInfo.id[:], id[:])
// Loop over all utxos for the txId
for index, value := range utxoMap {
txInfo.index = index
txInfo.value = value
randNum := rand.Intn(100)
// 30% sample rate to select UTXO to use for new transactions
if randNum >= 30 {
continue continue
} }
if setting.crossShard && randNum < 10 { // 1/3 cross shard transactions: add another txinput from another shard copy(txInfo.id[:], id[:])
generateCrossShardTx(&txInfo)
} else { // Loop over all utxos for the txId
generateSingleShardTx(&txInfo) for index, value := range utxoMap {
} txInfo.index = index
if txInfo.txCount >= setting.maxNumTxsPerBatch { txInfo.value = value
break UTXOLOOP
randNum := rand.Intn(100)
if setting.crossShard && randNum < 30 { // 1/3 cross shard transactions: add another txinput from another shard
generateCrossShardTx(&txInfo)
} else {
generateSingleShardTx(&txInfo)
}
if txInfo.txCount >= setting.maxNumTxsPerBatch {
break UTXOLOOP
}
} }
} }
} }
@ -125,8 +126,8 @@ UTXOLOOP:
func generateCrossShardTx(txInfo *TxInfo) { func generateCrossShardTx(txInfo *TxInfo) {
nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID
// shard with neighboring Id // a random shard to spend money to
crossShardId := (int(nodeShardID) + 1) % len(txInfo.dataNodes) crossShardId := rand.Intn(len(txInfo.dataNodes))
crossShardNode := txInfo.dataNodes[crossShardId] crossShardNode := txInfo.dataNodes[crossShardId]
crossShardUtxosMap := crossShardNode.UtxoPool.UtxoMap[txInfo.address] crossShardUtxosMap := crossShardNode.UtxoPool.UtxoMap[txInfo.address]
@ -241,6 +242,8 @@ func main() {
configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config")
maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 100000, "number of transactions to send per message") maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 100000, "number of transactions to send per message")
logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution") logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
numSubset := flag.Int("numSubset", 3, "the number of subsets of utxos to process separately")
duration := flag.Int("duration", 60, "duration of the tx generation in second")
flag.Parse() flag.Parse()
// Read the configs // Read the configs
@ -289,6 +292,7 @@ func main() {
// Add it to blockchain // Add it to blockchain
utxoPoolMutex.Lock() utxoPoolMutex.Lock()
node.AddNewBlock(block) node.AddNewBlock(block)
node.UpdateUtxoAndState(block)
utxoPoolMutex.Unlock() utxoPoolMutex.Unlock()
} else { } else {
continue continue
@ -302,49 +306,25 @@ func main() {
go func() { go func() {
clientNode.StartServer(clientPort) clientNode.StartServer(clientPort)
}() }()
} }
// Transaction generation process // Transaction generation process
time.Sleep(10 * time.Second) // wait for nodes to be ready time.Sleep(10 * time.Second) // wait for nodes to be ready
start := time.Now() start := time.Now()
totalTime := 60.0 //run for 1 minutes totalTime := float64(*duration)
batchCounter := 0
for true { for true {
t := time.Now() t := time.Now()
if t.Sub(start).Seconds() >= totalTime { if t.Sub(start).Seconds() >= totalTime {
log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime)
break break
} }
for shardId := 0; shardId < len(nodes); shardId++ {
allCrossTxs := []*blockchain.Transaction{} constructAndSendTransaction(batchCounter, *numSubset, shardId, leaders, nodes, clientNode, clientPort)
// Generate simulated transactions
for i, leader := range leaders {
txs, crossTxs := generateSimulatedTransactions(i, nodes)
allCrossTxs = append(allCrossTxs, crossTxs...)
log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs))
msg := proto_node.ConstructTransactionListMessage(txs)
p2p.SendMessage(leader, msg)
// Note cross shard txs are later sent in batch
}
if len(allCrossTxs) > 0 {
log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs))
msg := proto_node.ConstructTransactionListMessage(allCrossTxs)
p2p.BroadcastMessage(leaders, msg)
// Put cross shard tx into a pending list waiting for proofs from leaders
if clientPort != "" {
clientNode.Client.PendingCrossTxsMutex.Lock()
for _, tx := range allCrossTxs {
clientNode.Client.PendingCrossTxs[tx.ID] = tx
}
clientNode.Client.PendingCrossTxsMutex.Unlock()
}
} }
batchCounter++
time.Sleep(500 * time.Millisecond) // Send a batch of transactions periodically time.Sleep(1000 * time.Millisecond)
} }
// Send a stop message to stop the nodes at the end // Send a stop message to stop the nodes at the end
@ -352,3 +332,32 @@ func main() {
peers := append(config.GetValidators(), leaders...) peers := append(config.GetValidators(), leaders...)
p2p.BroadcastMessage(peers, msg) p2p.BroadcastMessage(peers, msg)
} }
func constructAndSendTransaction(subsetId, numSubset, shardId int, leaders []p2p.Peer, nodes []*node.Node, clientNode *node.Node, clientPort string) {
allCrossTxs := []*blockchain.Transaction{}
// Generate simulated transactions
leader := leaders[shardId]
txs, crossTxs := generateSimulatedTransactions(subsetId, numSubset, shardId, nodes)
allCrossTxs = append(allCrossTxs, crossTxs...)
log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs))
msg := proto_node.ConstructTransactionListMessage(txs)
p2p.SendMessage(leader, msg)
// Note cross shard txs are later sent in batch
if len(allCrossTxs) > 0 {
log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs))
msg := proto_node.ConstructTransactionListMessage(allCrossTxs)
p2p.BroadcastMessage(leaders, msg)
// Put cross shard tx into a pending list waiting for proofs from leaders
if clientPort != "" {
clientNode.Client.PendingCrossTxsMutex.Lock()
for _, tx := range allCrossTxs {
clientNode.Client.PendingCrossTxs[tx.ID] = tx
}
clientNode.Client.PendingCrossTxsMutex.Unlock()
}
}
}

@ -6,11 +6,12 @@ import (
"encoding/gob" "encoding/gob"
"encoding/hex" "encoding/hex"
"errors" "errors"
"net/http"
"net/url" "net/url"
"strconv" "strconv"
"time" "time"
"github.com/simple-rules/harmony-benchmark/profiler"
"github.com/dedis/kyber" "github.com/dedis/kyber"
"github.com/dedis/kyber/sign/schnorr" "github.com/dedis/kyber/sign/schnorr"
"github.com/simple-rules/harmony-benchmark/blockchain" "github.com/simple-rules/harmony-benchmark/blockchain"
@ -404,41 +405,42 @@ func (consensus *Consensus) verifyResponse(commitments *map[uint16]kyber.Point,
} }
func (consensus *Consensus) reportMetrics(block blockchain.Block) { func (consensus *Consensus) reportMetrics(block blockchain.Block) {
if !block.IsStateBlock() { // Skip state block stats if block.IsStateBlock() { // Skip state block stats
endTime := time.Now() return
timeElapsed := endTime.Sub(startTime) }
numOfTxs := block.NumTransactions
tps := float64(numOfTxs) / timeElapsed.Seconds()
consensus.Log.Info("TPS Report",
"numOfTXs", numOfTxs,
"startTime", startTime,
"endTime", endTime,
"timeElapsed", timeElapsed,
"TPS", tps,
"consensus", consensus)
// Post metrics
URL := "http://localhost:3000/report"
txHashes := []string{}
for i := 1; i <= 3; i++ {
if len(block.TransactionIds)-i >= 0 {
txHashes = append(txHashes, hex.EncodeToString(block.TransactionIds[len(block.TransactionIds)-i][:]))
}
}
form := url.Values{
"key": {consensus.pubKey.String()},
"tps": {strconv.FormatFloat(tps, 'f', 2, 64)},
"txCount": {strconv.Itoa(int(numOfTxs))},
"nodeCount": {strconv.Itoa(len(consensus.validators) + 1)},
"latestBlockHash": {hex.EncodeToString(consensus.blockHash[:])},
"latestTxHashes": txHashes,
"blockLatency": {strconv.Itoa(int(timeElapsed / time.Millisecond))},
}
body := bytes.NewBufferString(form.Encode()) endTime := time.Now()
rsp, err := http.Post(URL, "application/x-www-form-urlencoded", body) timeElapsed := endTime.Sub(startTime)
if err == nil { numOfTxs := block.NumTransactions
defer rsp.Body.Close() tps := float64(numOfTxs) / timeElapsed.Seconds()
consensus.Log.Info("TPS Report",
"numOfTXs", numOfTxs,
"startTime", startTime,
"endTime", endTime,
"timeElapsed", timeElapsed,
"TPS", tps,
"consensus", consensus)
// Post metrics
profiler := profiler.GetProfiler()
if profiler.MetricsReportURL == "" {
return
}
txHashes := []string{}
for i := 1; i <= 3; i++ {
if len(block.TransactionIds)-i >= 0 {
txHashes = append(txHashes, hex.EncodeToString(block.TransactionIds[len(block.TransactionIds)-i][:]))
} }
} }
metrics := url.Values{
"key": {consensus.pubKey.String()},
"tps": {strconv.FormatFloat(tps, 'f', 2, 64)},
"txCount": {strconv.Itoa(int(numOfTxs))},
"nodeCount": {strconv.Itoa(len(consensus.validators) + 1)},
"latestBlockHash": {hex.EncodeToString(consensus.blockHash[:])},
"latestTxHashes": txHashes,
"blockLatency": {strconv.Itoa(int(timeElapsed / time.Millisecond))},
}
profiler.LogMetrics(metrics)
} }

@ -55,7 +55,8 @@ func (node *Node) addPendingTransactions(newTxs []*blockchain.Transaction) {
// Note the pending transaction list will then contain the rest of the txs // Note the pending transaction list will then contain the rest of the txs
func (node *Node) getTransactionsForNewBlock(maxNumTxs int) ([]*blockchain.Transaction, []*blockchain.CrossShardTxAndProof) { func (node *Node) getTransactionsForNewBlock(maxNumTxs int) ([]*blockchain.Transaction, []*blockchain.CrossShardTxAndProof) {
node.pendingTxMutex.Lock() node.pendingTxMutex.Lock()
selected, unselected, crossShardTxs := node.UtxoPool.SelectTransactionsForNewBlock(node.pendingTransactions, maxNumTxs) selected, unselected, invalid, crossShardTxs := node.UtxoPool.SelectTransactionsForNewBlock(node.pendingTransactions, maxNumTxs)
_ = invalid // invalid txs are discard
node.pendingTransactions = unselected node.pendingTransactions = unselected
node.pendingTxMutex.Unlock() node.pendingTxMutex.Unlock()
return selected, crossShardTxs return selected, crossShardTxs

@ -18,9 +18,9 @@ import (
const ( const (
// The max number of transaction per a block. // The max number of transaction per a block.
MaxNumberOfTransactionsPerBlock = 3000 MaxNumberOfTransactionsPerBlock = 10000
// The number of blocks allowed before generating state block // The number of blocks allowed before generating state block
NumBlocksBeforeStateBlock = 10 NumBlocksBeforeStateBlock = 100
) )
// NodeHandler handles a new incoming connection. // NodeHandler handles a new incoming connection.

@ -1,6 +1,11 @@
package profiler package profiler
import ( import (
"bytes"
"net/http"
"net/url"
"os"
"sync"
"time" "time"
"github.com/shirou/gopsutil/process" "github.com/shirou/gopsutil/process"
@ -8,15 +13,30 @@ import (
) )
type Profiler struct { type Profiler struct {
logger log.Logger // parameters
PID int32 logger log.Logger
ShardID string pid int32
proc *process.Process shardID string
MetricsReportURL string
// Internal
proc *process.Process
} }
func NewProfiler(logger log.Logger, pid int, shardID string) *Profiler { var singleton *Profiler
profiler := Profiler{logger, int32(pid), shardID, nil} var once sync.Once
return &profiler
func GetProfiler() *Profiler {
once.Do(func() {
singleton = &Profiler{}
})
return singleton
}
func (profiler *Profiler) Config(logger log.Logger, shardID string, metricsReportURL string) {
profiler.logger = logger
profiler.pid = int32(os.Getpid())
profiler.shardID = shardID
profiler.MetricsReportURL = metricsReportURL
} }
func (profiler *Profiler) LogMemory() { func (profiler *Profiler) LogMemory() {
@ -24,7 +44,7 @@ func (profiler *Profiler) LogMemory() {
// log mem usage // log mem usage
info, _ := profiler.proc.MemoryInfo() info, _ := profiler.proc.MemoryInfo()
memMap, _ := profiler.proc.MemoryMaps(false) memMap, _ := profiler.proc.MemoryMaps(false)
profiler.logger.Info("Mem Report", "info", info, "map", memMap, "shardID", profiler.ShardID) profiler.logger.Info("Mem Report", "info", info, "map", memMap, "shardID", profiler.shardID)
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
} }
@ -35,14 +55,22 @@ func (profiler *Profiler) LogCPU() {
// log cpu usage // log cpu usage
percent, _ := profiler.proc.CPUPercent() percent, _ := profiler.proc.CPUPercent()
times, _ := profiler.proc.Times() times, _ := profiler.proc.Times()
profiler.logger.Info("CPU Report", "percent", percent, "times", times, "shardID", profiler.ShardID) profiler.logger.Info("CPU Report", "percent", percent, "times", times, "shardID", profiler.shardID)
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
} }
} }
func (profiler *Profiler) LogMetrics(metrics url.Values) {
body := bytes.NewBufferString(metrics.Encode())
rsp, err := http.Post(profiler.MetricsReportURL, "application/x-www-form-urlencoded", body)
if err == nil {
defer rsp.Body.Close()
}
}
func (profiler *Profiler) Start() { func (profiler *Profiler) Start() {
profiler.proc, _ = process.NewProcess(profiler.PID) profiler.proc, _ = process.NewProcess(profiler.pid)
go profiler.LogCPU() go profiler.LogCPU()
go profiler.LogMemory() go profiler.LogMemory()
} }

Loading…
Cancel
Save