Merge pull request #99 from harmony-one/rj_branch

[HAR-21] Add simulated transactions and integrate into txgen
pull/103/head
Rongjian Lan 6 years ago committed by GitHub
commit 9a3b178ff4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      benchmark.go
  2. 395
      client/txgen/main.go
  3. 32
      client/txgen/txgen/account_txs_generator.go
  4. 212
      client/txgen/txgen/utxo_txs_generator.go
  5. 1
      consensus/consensus_leader.go
  6. 14
      core/blockchain.go
  7. 3
      core/genesis.go
  8. 8
      discovery/discovery.go
  9. 2
      harmony/main.go
  10. 72
      node/node.go
  11. 183
      node/node_handler.go
  12. 39
      node/worker/worker.go
  13. 18
      proto/node/node.go
  14. 13
      trie/database.go

@ -74,7 +74,7 @@ func loggingInit(logFolder, role, ip, port string, onlyLogTps bool) {
} }
func main() { func main() {
accountModel := flag.Bool("account_model", false, "Whether to use account model") accountModel := flag.Bool("account_model", true, "Whether to use account model")
// TODO: use http://getmyipaddress.org/ or http://www.get-myip.com/ to retrieve my IP address // TODO: use http://getmyipaddress.org/ or http://www.get-myip.com/ to retrieve my IP address
ip := flag.String("ip", "127.0.0.1", "IP of the node") ip := flag.String("ip", "127.0.0.1", "IP of the node")
port := flag.String("port", "9000", "port of the node.") port := flag.String("port", "9000", "port of the node.")
@ -113,15 +113,14 @@ func main() {
var leader p2p.Peer var leader p2p.Peer
var selfPeer p2p.Peer var selfPeer p2p.Peer
var clientPeer *p2p.Peer var clientPeer *p2p.Peer
// Use Peer Discovery to get shard/leader/peer/... // Use Peer Discovery to get shard/leader/peer/...
priKey, pubKey := utils.GenKey(*ip, *port)
if *peerDisvoery { if *peerDisvoery {
pubKey, priKey := utils.GenKey(*ip, *port)
// Contact Identity Chain // Contact Identity Chain
// This is a blocking call // This is a blocking call
// Assume @ak has get it working // Assume @ak has get it working
// TODO: this has to work with @ak's fix // TODO: this has to work with @ak's fix
discoveryConfig := discovery.New(pubKey, priKey) discoveryConfig := discovery.New(priKey, pubKey)
err := discoveryConfig.StartClientMode(*idcIP, *idcPort) err := discoveryConfig.StartClientMode(*idcIP, *idcPort)
if err != nil { if err != nil {
@ -144,6 +143,7 @@ func main() {
// Create client peer. // Create client peer.
clientPeer = distributionConfig.GetClientPeer() clientPeer = distributionConfig.GetClientPeer()
} }
selfPeer.PubKey = pubKey
var role string var role string
if leader.Ip == *ip && leader.Port == *port { if leader.Ip == *ip && leader.Port == *port {

@ -1,11 +1,11 @@
package main package main
import ( import (
"encoding/binary"
"encoding/hex"
"flag" "flag"
"fmt" "fmt"
"math/rand" "github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/client/txgen/txgen"
"github.com/harmony-one/harmony/core/types"
"os" "os"
"path" "path"
"runtime" "runtime"
@ -16,7 +16,6 @@ import (
"github.com/harmony-one/harmony/client" "github.com/harmony-one/harmony/client"
client_config "github.com/harmony-one/harmony/client/config" client_config "github.com/harmony-one/harmony/client/config"
"github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/consensus"
"github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/log" "github.com/harmony-one/harmony/log"
"github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/node"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
@ -24,235 +23,25 @@ import (
) )
var ( var (
version string version string
builtBy string builtBy string
builtAt string builtAt string
commit string commit string
)
type txGenSettings struct {
numOfAddress int
crossShard bool
maxNumTxsPerBatch int
crossShardRatio int
}
var (
utxoPoolMutex sync.Mutex utxoPoolMutex sync.Mutex
setting txGenSettings
) )
type TxInfo struct {
// Global Input
shardID int
dataNodes []*node.Node
// Temp Input
id [32]byte
index uint32
value int
address [20]byte
// Output
txs []*blockchain.Transaction
crossTxs []*blockchain.Transaction
txCount int
}
// Generates at most "maxNumTxs" number of simulated transactions based on the current UtxoPools of all shards.
// The transactions are generated by going through the existing utxos and
// randomly select a subset of them as the input for each new transaction. The output
// address of the new transaction are randomly selected from [0 - N), where N is the total number of fake addresses.
//
// When crossShard=true, besides the selected utxo input, select another valid utxo as input from the same address in a second shard.
// Similarly, generate another utxo output in that second shard.
//
// NOTE: the genesis block should contain N coinbase transactions which add
// token (1000) to each address in [0 - N). See node.AddTestingAddresses()
//
// Params:
// subsetId - the which subset of the utxo to work on (used to select addresses)
// shardID - the shardID for current shard
// dataNodes - nodes containing utxopools of all shards
// Returns:
// all single-shard txs
// all cross-shard txs
func generateSimulatedTransactions(subsetId, numSubset int, shardID int, dataNodes []*node.Node) ([]*blockchain.Transaction, []*blockchain.Transaction) {
/*
UTXO map structure:
address - [
txID1 - [
outputIndex1 - value1
outputIndex2 - value2
]
txID2 - [
outputIndex1 - value1
outputIndex2 - value2
]
]
*/
txInfo := TxInfo{}
txInfo.shardID = shardID
txInfo.dataNodes = dataNodes
txInfo.txCount = 0
UTXOLOOP:
// Loop over all addresses
for address, txMap := range dataNodes[shardID].UtxoPool.UtxoMap {
if int(binary.BigEndian.Uint32(address[:]))%numSubset == subsetId%numSubset { // Work on one subset of utxo at a time
txInfo.address = address
// Loop over all txIDs for the address
for txIDStr, utxoMap := range txMap {
// Parse TxId
id, err := hex.DecodeString(txIDStr)
if err != nil {
continue
}
copy(txInfo.id[:], id[:])
// Loop over all utxos for the txID
utxoSize := len(utxoMap)
batchSize := utxoSize / numSubset
i := subsetId % numSubset
counter := 0
for index, value := range utxoMap {
counter++
if batchSize*i < counter && counter > batchSize*(i+1) {
continue
}
txInfo.index = index
txInfo.value = value
randNum := rand.Intn(100)
subsetRatio := 100 // / numSubset
if randNum < subsetRatio { // Sample based on batch size
if setting.crossShard && randNum < subsetRatio*setting.crossShardRatio/100 { // 30% cross shard transactions: add another txinput from another shard
generateCrossShardTx(&txInfo)
} else {
generateSingleShardTx(&txInfo)
}
if txInfo.txCount >= setting.maxNumTxsPerBatch {
break UTXOLOOP
}
}
}
}
}
}
log.Info("UTXO CLIENT", "numUtxo", dataNodes[shardID].UtxoPool.CountNumOfUtxos(), "shardID", shardID)
log.Debug("[Generator] generated transations", "single-shard", len(txInfo.txs), "cross-shard", len(txInfo.crossTxs))
return txInfo.txs, txInfo.crossTxs
}
func generateCrossShardTx(txInfo *TxInfo) {
nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID
crossShardID := nodeShardID
// a random shard to spend money to
for {
crossShardID = uint32(rand.Intn(len(txInfo.dataNodes)))
if crossShardID != nodeShardID {
break
}
}
//crossShardNode := txInfo.dataNodes[crossShardID]
//crossShardUtxosMap := crossShardNode.UtxoPool.UtxoMap[txInfo.address]
//
//// Get the cross shard utxo from another shard
//var crossTxin *blockchain.TXInput
//crossUtxoValue := 0
//// Loop over utxos for the same address from the other shard and use the first utxo as the second cross tx input
//for crossTxIdStr, crossShardUtxos := range crossShardUtxosMap {
// // Parse TxId
// id, err := hex.DecodeString(crossTxIdStr)
// if err != nil {
// continue
// }
// crossTxId := [32]byte{}
// copy(crossTxId[:], id[:])
//
// for crossShardIndex, crossShardValue := range crossShardUtxos {
// crossUtxoValue = crossShardValue
// crossTxin = blockchain.NewTXInput(blockchain.NewOutPoint(&crossTxId, crossShardIndex), txInfo.address, crossShardID)
// break
// }
// if crossTxin != nil {
// break
// }
//}
// Add the utxo from current shard
txIn := blockchain.NewTXInput(blockchain.NewOutPoint(&txInfo.id, txInfo.index), txInfo.address, nodeShardID)
txInputs := []blockchain.TXInput{*txIn}
// Add the utxo from the other shard, if any
//if crossTxin != nil { // This means the ratio of cross shard tx could be lower than 1/3
// txInputs = append(txInputs, *crossTxin)
//}
// Spend the utxo from the current shard to a random address in [0 - N)
txout := blockchain.TXOutput{Amount: txInfo.value, Address: pki.GetAddressFromInt(rand.Intn(setting.numOfAddress) + 1), ShardID: crossShardID}
txOutputs := []blockchain.TXOutput{txout}
// Spend the utxo from the other shard, if any, to a random address in [0 - N)
//if crossTxin != nil {
// crossTxout := blockchain.TXOutput{Amount: crossUtxoValue, Address: pki.GetAddressFromInt(rand.Intn(setting.numOfAddress) + 1), ShardID: crossShardID}
// txOutputs = append(txOutputs, crossTxout)
//}
// Construct the new transaction
tx := blockchain.Transaction{ID: [32]byte{}, TxInput: txInputs, TxOutput: txOutputs, Proofs: nil}
priKeyInt, ok := client.LookUpIntPriKey(txInfo.address)
if ok {
tx.PublicKey = pki.GetBytesFromPublicKey(pki.GetPublicKeyFromScalar(pki.GetPrivateKeyScalarFromInt(priKeyInt)))
tx.SetID() // TODO(RJ): figure out the correct way to set Tx ID.
tx.Sign(pki.GetPrivateKeyScalarFromInt(priKeyInt))
} else {
log.Error("Failed to look up the corresponding private key from address", "Address", txInfo.address)
return
}
txInfo.crossTxs = append(txInfo.crossTxs, &tx)
txInfo.txCount++
}
func generateSingleShardTx(txInfo *TxInfo) {
nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID
// Add the utxo as new tx input
txin := blockchain.NewTXInput(blockchain.NewOutPoint(&txInfo.id, txInfo.index), txInfo.address, nodeShardID)
// Spend the utxo to a random address in [0 - N)
txout := blockchain.TXOutput{Amount: txInfo.value, Address: pki.GetAddressFromInt(rand.Intn(setting.numOfAddress) + 1), ShardID: nodeShardID}
tx := blockchain.Transaction{ID: [32]byte{}, TxInput: []blockchain.TXInput{*txin}, TxOutput: []blockchain.TXOutput{txout}, Proofs: nil}
priKeyInt, ok := client.LookUpIntPriKey(txInfo.address)
if ok {
tx.PublicKey = pki.GetBytesFromPublicKey(pki.GetPublicKeyFromScalar(pki.GetPrivateKeyScalarFromInt(priKeyInt)))
tx.SetID() // TODO(RJ): figure out the correct way to set Tx ID.
tx.Sign(pki.GetPrivateKeyScalarFromInt(priKeyInt))
} else {
log.Error("Failed to look up the corresponding private key from address", "Address", txInfo.address)
return
}
txInfo.txs = append(txInfo.txs, &tx)
txInfo.txCount++
}
func printVersion(me string) { func printVersion(me string) {
fmt.Fprintf(os.Stderr, "Harmony (C) 2018. %v, version %v-%v (%v %v)\n", path.Base(me), version, commit, builtBy, builtAt) fmt.Fprintf(os.Stderr, "Harmony (C) 2018. %v, version %v-%v (%v %v)\n", path.Base(me), version, commit, builtBy, builtAt)
os.Exit(0) os.Exit(0)
} }
func main() { func main() {
accountModel := flag.Bool("account_model", true, "Whether to use account model")
configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config")
maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 20000, "number of transactions to send per message") maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 20000, "number of transactions to send per message")
logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution") logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
numSubset := flag.Int("numSubset", 3, "the number of subsets of utxos to process separately") numSubset := flag.Int("numSubset", 3, "the number of subsets of utxos to process separately")
duration := flag.Int("duration", 60, "duration of the tx generation in second. If it's negative, the experiment runs forever.") duration := flag.Int("duration", 10, "duration of the tx generation in second. If it's negative, the experiment runs forever.")
versionFlag := flag.Bool("version", false, "Output version info") versionFlag := flag.Bool("version", false, "Output version info")
crossShardRatio := flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.") crossShardRatio := flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.")
flag.Parse() flag.Parse()
@ -269,11 +58,13 @@ func main() {
config.ReadConfigFile(*configFile) config.ReadConfigFile(*configFile)
shardIDLeaderMap := config.GetShardIDToLeaderMap() shardIDLeaderMap := config.GetShardIDToLeaderMap()
setting.numOfAddress = 10000
// Do cross shard tx if there are more than one shard // Do cross shard tx if there are more than one shard
setting.crossShard = len(shardIDLeaderMap) > 1 setting := txgen.TxGenSettings{
setting.maxNumTxsPerBatch = *maxNumTxsPerBatch 10000,
setting.crossShardRatio = *crossShardRatio len(shardIDLeaderMap) > 1,
*maxNumTxsPerBatch,
*crossShardRatio,
}
// TODO(Richard): refactor this chuck to a single method // TODO(Richard): refactor this chuck to a single method
// Setup a logger to stdout and log file. // Setup a logger to stdout and log file.
@ -289,7 +80,7 @@ func main() {
for shardID := range shardIDLeaderMap { for shardID := range shardIDLeaderMap {
node := node.New(&consensus.Consensus{ShardID: shardID}, nil) node := node.New(&consensus.Consensus{ShardID: shardID}, nil)
// Assign many fake addresses so we have enough address to play with at first // Assign many fake addresses so we have enough address to play with at first
node.AddTestingAddresses(setting.numOfAddress) node.AddTestingAddresses(setting.NumOfAddress)
nodes = append(nodes, node) nodes = append(nodes, node)
} }
@ -313,6 +104,19 @@ func main() {
utxoPoolMutex.Lock() utxoPoolMutex.Lock()
node.UpdateUtxoAndState(block) node.UpdateUtxoAndState(block)
utxoPoolMutex.Unlock() utxoPoolMutex.Unlock()
accountBlock := new(types.Block)
err := rlp.DecodeBytes(block.AccountBlock, accountBlock)
fmt.Println("RECEIVED NEW BLOCK ", len(accountBlock.Transactions()))
if err != nil {
log.Error("Failed decoding the block with RLP")
} else {
err = node.Worker.CommitTransactions(accountBlock.Transactions(), accountBlock.Coinbase())
node.Worker.UpdateCurrent()
if err != nil {
log.Debug("Failed to add new block to worker", "Error", err)
}
}
} else { } else {
continue continue
} }
@ -335,57 +139,100 @@ func main() {
client.InitLookUpIntPriKeyMap() client.InitLookUpIntPriKeyMap()
subsetCounter := 0 subsetCounter := 0
for { if *accountModel {
t := time.Now() for {
if totalTime > 0 && t.Sub(start).Seconds() >= totalTime { t := time.Now()
log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) if totalTime > 0 && t.Sub(start).Seconds() >= totalTime {
break log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime)
break
}
shardIDTxsMap := make(map[uint32]types.Transactions)
lock := sync.Mutex{}
var wg sync.WaitGroup
wg.Add(len(shardIDLeaderMap))
utxoPoolMutex.Lock()
log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0))
for shardID, _ := range shardIDLeaderMap { // Generate simulated transactions
go func(shardID uint32) {
txs, _ := txgen.GenerateSimulatedTransactionsAccount(int(shardID), nodes, setting)
// TODO: Put cross shard tx into a pending list waiting for proofs from leaders
lock.Lock()
// Put txs into corresponding shards
shardIDTxsMap[shardID] = append(shardIDTxsMap[shardID], txs...)
lock.Unlock()
wg.Done()
}(shardID)
}
wg.Wait()
utxoPoolMutex.Unlock()
lock.Lock()
for shardID, txs := range shardIDTxsMap { // Send the txs to corresponding shards
go func(shardID uint32, txs types.Transactions) {
SendTxsToLeaderAccount(shardIDLeaderMap[shardID], txs)
}(shardID, txs)
}
lock.Unlock()
subsetCounter++
time.Sleep(10000 * time.Millisecond)
} }
shardIDTxsMap := make(map[uint32][]*blockchain.Transaction) } else {
lock := sync.Mutex{} for {
var wg sync.WaitGroup t := time.Now()
wg.Add(len(shardIDLeaderMap)) if totalTime > 0 && t.Sub(start).Seconds() >= totalTime {
log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime)
utxoPoolMutex.Lock() break
log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0)) }
for shardID, _ := range shardIDLeaderMap { // Generate simulated transactions shardIDTxsMap := make(map[uint32][]*blockchain.Transaction)
go func(shardID uint32) { lock := sync.Mutex{}
txs, crossTxs := generateSimulatedTransactions(subsetCounter, *numSubset, int(shardID), nodes) var wg sync.WaitGroup
wg.Add(len(shardIDLeaderMap))
// Put cross shard tx into a pending list waiting for proofs from leaders
if clientPort != "" { utxoPoolMutex.Lock()
clientNode.Client.PendingCrossTxsMutex.Lock() log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0))
for _, tx := range crossTxs { for shardID, _ := range shardIDLeaderMap { // Generate simulated transactions
clientNode.Client.PendingCrossTxs[tx.ID] = tx go func(shardID uint32) {
txs, crossTxs := txgen.GenerateSimulatedTransactions(subsetCounter, *numSubset, int(shardID), nodes, setting)
// Put cross shard tx into a pending list waiting for proofs from leaders
if clientPort != "" {
clientNode.Client.PendingCrossTxsMutex.Lock()
for _, tx := range crossTxs {
clientNode.Client.PendingCrossTxs[tx.ID] = tx
}
clientNode.Client.PendingCrossTxsMutex.Unlock()
} }
clientNode.Client.PendingCrossTxsMutex.Unlock()
}
lock.Lock() lock.Lock()
// Put txs into corresponding shards // Put txs into corresponding shards
shardIDTxsMap[shardID] = append(shardIDTxsMap[shardID], txs...) shardIDTxsMap[shardID] = append(shardIDTxsMap[shardID], txs...)
for _, crossTx := range crossTxs { for _, crossTx := range crossTxs {
for curShardID, _ := range client.GetInputShardIDsOfCrossShardTx(crossTx) { for curShardID, _ := range client.GetInputShardIDsOfCrossShardTx(crossTx) {
shardIDTxsMap[curShardID] = append(shardIDTxsMap[curShardID], crossTx) shardIDTxsMap[curShardID] = append(shardIDTxsMap[curShardID], crossTx)
}
} }
} lock.Unlock()
lock.Unlock() wg.Done()
wg.Done() }(shardID)
}(shardID) }
} wg.Wait()
wg.Wait() utxoPoolMutex.Unlock()
utxoPoolMutex.Unlock()
lock.Lock()
lock.Lock() for shardID, txs := range shardIDTxsMap { // Send the txs to corresponding shards
for shardID, txs := range shardIDTxsMap { // Send the txs to corresponding shards go func(shardID uint32, txs []*blockchain.Transaction) {
go func(shardID uint32, txs []*blockchain.Transaction) { SendTxsToLeader(shardIDLeaderMap[shardID], txs)
SendTxsToLeader(shardIDLeaderMap[shardID], txs) }(shardID, txs)
}(shardID, txs) }
} lock.Unlock()
lock.Unlock()
subsetCounter++ subsetCounter++
time.Sleep(10000 * time.Millisecond) time.Sleep(10000 * time.Millisecond)
}
} }
// Send a stop message to stop the nodes at the end // Send a stop message to stop the nodes at the end
@ -400,3 +247,9 @@ func SendTxsToLeader(leader p2p.Peer, txs []*blockchain.Transaction) {
msg := proto_node.ConstructTransactionListMessage(txs) msg := proto_node.ConstructTransactionListMessage(txs)
p2p.SendMessage(leader, msg) p2p.SendMessage(leader, msg)
} }
func SendTxsToLeaderAccount(leader p2p.Peer, txs types.Transactions) {
log.Debug("[Generator] Sending account-based txs to...", "leader", leader, "numTxs", len(txs))
msg := proto_node.ConstructTransactionListMessageAccount(txs)
p2p.SendMessage(leader, msg)
}

@ -0,0 +1,32 @@
package txgen
import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/node"
"math/big"
)
type TxGenSettings struct {
NumOfAddress int
CrossShard bool
MaxNumTxsPerBatch int
CrossShardRatio int
}
func GenerateSimulatedTransactionsAccount(shardID int, dataNodes []*node.Node, setting TxGenSettings) (types.Transactions, types.Transactions) {
_ = setting // TODO: take use of settings
node := dataNodes[shardID]
txs := make([]*types.Transaction, 1000)
for i := 0; i < 100; i++ {
baseNonce := node.Worker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(node.TestBankKeys[i].PublicKey))
for j := 0; j < 10; j++ {
randomUserKey, _ := crypto.GenerateKey()
randomUserAddress := crypto.PubkeyToAddress(randomUserKey.PublicKey)
tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(j), randomUserAddress, big.NewInt(1000), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i])
txs[i*10+j] = tx
}
}
return txs, nil
}

@ -0,0 +1,212 @@
package txgen
import (
"encoding/binary"
"encoding/hex"
"github.com/harmony-one/harmony/blockchain"
"github.com/harmony-one/harmony/client"
"github.com/harmony-one/harmony/crypto/pki"
"github.com/harmony-one/harmony/log"
"github.com/harmony-one/harmony/node"
"math/rand"
)
type TxInfo struct {
// Global Input
shardID int
dataNodes []*node.Node
// Temp Input
id [32]byte
index uint32
value int
address [20]byte
// Output
txs []*blockchain.Transaction
crossTxs []*blockchain.Transaction
txCount int
}
// Generates at most "maxNumTxs" number of simulated transactions based on the current UtxoPools of all shards.
// The transactions are generated by going through the existing utxos and
// randomly select a subset of them as the input for each new transaction. The output
// address of the new transaction are randomly selected from [0 - N), where N is the total number of fake addresses.
//
// When crossShard=true, besides the selected utxo input, select another valid utxo as input from the same address in a second shard.
// Similarly, generate another utxo output in that second shard.
//
// NOTE: the genesis block should contain N coinbase transactions which add
// token (1000) to each address in [0 - N). See node.AddTestingAddresses()
//
// Params:
// subsetId - the which subset of the utxo to work on (used to select addresses)
// shardID - the shardID for current shard
// dataNodes - nodes containing utxopools of all shards
// Returns:
// all single-shard txs
// all cross-shard txs
func GenerateSimulatedTransactions(subsetId, numSubset int, shardID int, dataNodes []*node.Node, setting TxGenSettings) ([]*blockchain.Transaction, []*blockchain.Transaction) {
/*
UTXO map structure:
address - [
txID1 - [
outputIndex1 - value1
outputIndex2 - value2
]
txID2 - [
outputIndex1 - value1
outputIndex2 - value2
]
]
*/
txInfo := TxInfo{}
txInfo.shardID = shardID
txInfo.dataNodes = dataNodes
txInfo.txCount = 0
UTXOLOOP:
// Loop over all addresses
for address, txMap := range dataNodes[shardID].UtxoPool.UtxoMap {
if int(binary.BigEndian.Uint32(address[:]))%numSubset == subsetId%numSubset { // Work on one subset of utxo at a time
txInfo.address = address
// Loop over all txIDs for the address
for txIDStr, utxoMap := range txMap {
// Parse TxId
id, err := hex.DecodeString(txIDStr)
if err != nil {
continue
}
copy(txInfo.id[:], id[:])
// Loop over all utxos for the txID
utxoSize := len(utxoMap)
batchSize := utxoSize / numSubset
i := subsetId % numSubset
counter := 0
for index, value := range utxoMap {
counter++
if batchSize*i < counter && counter > batchSize*(i+1) {
continue
}
txInfo.index = index
txInfo.value = value
randNum := rand.Intn(100)
subsetRatio := 100 // / numSubset
if randNum < subsetRatio { // Sample based on batch size
if setting.CrossShard && randNum < subsetRatio*setting.CrossShardRatio/100 { // 30% cross shard transactions: add another txinput from another shard
generateCrossShardTx(&txInfo, setting)
} else {
generateSingleShardTx(&txInfo, setting)
}
if txInfo.txCount >= setting.MaxNumTxsPerBatch {
break UTXOLOOP
}
}
}
}
}
}
log.Info("UTXO CLIENT", "numUtxo", dataNodes[shardID].UtxoPool.CountNumOfUtxos(), "shardID", shardID)
log.Debug("[Generator] generated transations", "single-shard", len(txInfo.txs), "cross-shard", len(txInfo.crossTxs))
return txInfo.txs, txInfo.crossTxs
}
func generateCrossShardTx(txInfo *TxInfo, setting TxGenSettings) {
nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID
crossShardID := nodeShardID
// a random shard to spend money to
for {
crossShardID = uint32(rand.Intn(len(txInfo.dataNodes)))
if crossShardID != nodeShardID {
break
}
}
//crossShardNode := txInfo.dataNodes[crossShardID]
//crossShardUtxosMap := crossShardNode.UtxoPool.UtxoMap[txInfo.address]
//
//// Get the cross shard utxo from another shard
//var crossTxin *blockchain.TXInput
//crossUtxoValue := 0
//// Loop over utxos for the same address from the other shard and use the first utxo as the second cross tx input
//for crossTxIdStr, crossShardUtxos := range crossShardUtxosMap {
// // Parse TxId
// id, err := hex.DecodeString(crossTxIdStr)
// if err != nil {
// continue
// }
// crossTxId := [32]byte{}
// copy(crossTxId[:], id[:])
//
// for crossShardIndex, crossShardValue := range crossShardUtxos {
// crossUtxoValue = crossShardValue
// crossTxin = blockchain.NewTXInput(blockchain.NewOutPoint(&crossTxId, crossShardIndex), txInfo.address, crossShardID)
// break
// }
// if crossTxin != nil {
// break
// }
//}
// Add the utxo from current shard
txIn := blockchain.NewTXInput(blockchain.NewOutPoint(&txInfo.id, txInfo.index), txInfo.address, nodeShardID)
txInputs := []blockchain.TXInput{*txIn}
// Add the utxo from the other shard, if any
//if crossTxin != nil { // This means the ratio of cross shard tx could be lower than 1/3
// txInputs = append(txInputs, *crossTxin)
//}
// Spend the utxo from the current shard to a random address in [0 - N)
txout := blockchain.TXOutput{Amount: txInfo.value, Address: pki.GetAddressFromInt(rand.Intn(setting.NumOfAddress) + 1), ShardID: crossShardID}
txOutputs := []blockchain.TXOutput{txout}
// Spend the utxo from the other shard, if any, to a random address in [0 - N)
//if crossTxin != nil {
// crossTxout := blockchain.TXOutput{Amount: crossUtxoValue, Address: pki.GetAddressFromInt(rand.Intn(setting.numOfAddress) + 1), ShardID: crossShardID}
// txOutputs = append(txOutputs, crossTxout)
//}
// Construct the new transaction
tx := blockchain.Transaction{ID: [32]byte{}, TxInput: txInputs, TxOutput: txOutputs, Proofs: nil}
priKeyInt, ok := client.LookUpIntPriKey(txInfo.address)
if ok {
tx.PublicKey = pki.GetBytesFromPublicKey(pki.GetPublicKeyFromScalar(pki.GetPrivateKeyScalarFromInt(priKeyInt)))
tx.SetID() // TODO(RJ): figure out the correct way to set Tx ID.
tx.Sign(pki.GetPrivateKeyScalarFromInt(priKeyInt))
} else {
log.Error("Failed to look up the corresponding private key from address", "Address", txInfo.address)
return
}
txInfo.crossTxs = append(txInfo.crossTxs, &tx)
txInfo.txCount++
}
func generateSingleShardTx(txInfo *TxInfo, setting TxGenSettings) {
nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID
// Add the utxo as new tx input
txin := blockchain.NewTXInput(blockchain.NewOutPoint(&txInfo.id, txInfo.index), txInfo.address, nodeShardID)
// Spend the utxo to a random address in [0 - N)
txout := blockchain.TXOutput{Amount: txInfo.value, Address: pki.GetAddressFromInt(rand.Intn(setting.NumOfAddress) + 1), ShardID: nodeShardID}
tx := blockchain.Transaction{ID: [32]byte{}, TxInput: []blockchain.TXInput{*txin}, TxOutput: []blockchain.TXOutput{txout}, Proofs: nil}
priKeyInt, ok := client.LookUpIntPriKey(txInfo.address)
if ok {
tx.PublicKey = pki.GetBytesFromPublicKey(pki.GetPublicKeyFromScalar(pki.GetPrivateKeyScalarFromInt(priKeyInt)))
tx.SetID() // TODO(RJ): figure out the correct way to set Tx ID.
tx.Sign(pki.GetPrivateKeyScalarFromInt(priKeyInt))
} else {
log.Error("Failed to look up the corresponding private key from address", "Address", txInfo.address)
return
}
txInfo.txs = append(txInfo.txs, &tx)
txInfo.txCount++
}

@ -61,6 +61,7 @@ func (consensus *Consensus) WaitForNewBlockAccount(blockChannel chan *types.Bloc
// time.Sleep(500 * time.Millisecond) // time.Sleep(500 * time.Millisecond)
data, err := rlp.EncodeToBytes(newBlock) data, err := rlp.EncodeToBytes(newBlock)
if err == nil { if err == nil {
consensus.Log.Debug("Sample tx", "tx", newBlock.Transactions()[0])
consensus.startConsensus(&blockchain.Block{Hash: newBlock.Hash(), AccountBlock: data}) consensus.startConsensus(&blockchain.Block{Hash: newBlock.Hash(), AccountBlock: data})
} else { } else {
consensus.Log.Error("Failed encoding the block with RLP") consensus.Log.Error("Failed encoding the block with RLP")

@ -189,30 +189,26 @@ func NewBlockChain(db hdb.Database, cacheConfig *CacheConfig, chainConfig *param
return bc, nil return bc, nil
} }
func (bc *BlockChain) ValidateNewBlock(block *types.Block, address common.Address) bool { func (bc *BlockChain) ValidateNewBlock(block *types.Block, address common.Address) error {
state, err := state.New(bc.CurrentBlock().Root(), bc.stateCache) state, err := state.New(bc.CurrentBlock().Root(), bc.stateCache)
fmt.Println("WITHIN NNNNNNNNNNNNNN", err)
if err != nil { if err != nil {
return false return err
} }
fmt.Println("Balance 3 ", state.GetBalance(address))
// Process block using the parent state as reference point. // Process block using the parent state as reference point.
receipts, _, usedGas, err := bc.processor.Process(block, state, bc.vmConfig) receipts, _, usedGas, err := bc.processor.Process(block, state, bc.vmConfig)
if err != nil { if err != nil {
bc.reportBlock(block, receipts, err) bc.reportBlock(block, receipts, err)
return false return err
} }
fmt.Println("WITHIN NNNNNNNNNNNNNN2", err)
err = bc.Validator().ValidateState(block, bc.CurrentBlock(), state, receipts, usedGas) err = bc.Validator().ValidateState(block, bc.CurrentBlock(), state, receipts, usedGas)
if err != nil { if err != nil {
bc.reportBlock(block, receipts, err) bc.reportBlock(block, receipts, err)
return false return err
} }
fmt.Println("WITHIN NNNNNNNNNNNNNN3", err) return nil
return true
} }
func (bc *BlockChain) getProcInterrupt() bool { func (bc *BlockChain) getProcInterrupt() bool {

@ -115,7 +115,6 @@ func (h *storageJSON) UnmarshalText(text []byte) error {
} }
offset := len(h) - len(text)/2 // pad on the left offset := len(h) - len(text)/2 // pad on the left
if _, err := hex.Decode(h[offset:], text); err != nil { if _, err := hex.Decode(h[offset:], text); err != nil {
fmt.Println(err)
return fmt.Errorf("invalid hex storage key/value %q", text) return fmt.Errorf("invalid hex storage key/value %q", text)
} }
return nil return nil
@ -246,7 +245,7 @@ func (g *Genesis) ToBlock(db hdb.Database) *types.Block {
Root: root, Root: root,
} }
if g.GasLimit == 0 { if g.GasLimit == 0 {
head.GasLimit = params.GenesisGasLimit head.GasLimit = 10000000000 // TODO(RJ): figure out better solution. // params.GenesisGasLimit
} }
if g.Difficulty == nil { if g.Difficulty == nil {
head.Difficulty = params.GenesisDifficulty head.Difficulty = params.GenesisDifficulty

@ -16,18 +16,18 @@ type ConfigEntry struct {
leader p2p.Peer leader p2p.Peer
self p2p.Peer self p2p.Peer
peers []p2p.Peer peers []p2p.Peer
pubK kyber.Scalar priK kyber.Scalar
priK kyber.Point pubK kyber.Point
} }
func (config ConfigEntry) String() string { func (config ConfigEntry) String() string {
return fmt.Sprintf("idc: %v:%v", config.IP, config.Port) return fmt.Sprintf("idc: %v:%v", config.IP, config.Port)
} }
func New(pubK kyber.Scalar, priK kyber.Point) *ConfigEntry { func New(priK kyber.Scalar, pubK kyber.Point) *ConfigEntry {
var config ConfigEntry var config ConfigEntry
config.pubK = pubK
config.priK = priK config.priK = priK
config.pubK = pubK
config.peers = make([]p2p.Peer, 0) config.peers = make([]p2p.Peer, 0)

@ -84,7 +84,6 @@ func main() {
txs := make([]*types.Transaction, 100) txs := make([]*types.Transaction, 100)
worker := worker.New(params.TestChainConfig, chain, consensus.NewFaker()) worker := worker.New(params.TestChainConfig, chain, consensus.NewFaker())
fmt.Println(worker.GetCurrentState().GetBalance(testBankAddress)) fmt.Println(worker.GetCurrentState().GetBalance(testBankAddress))
fmt.Println(worker.Commit().Root())
for i, _ := range txs { for i, _ := range txs {
randomUserKey, _ := crypto.GenerateKey() randomUserKey, _ := crypto.GenerateKey()
@ -96,5 +95,4 @@ func main() {
worker.CommitTransactions(txs, crypto.PubkeyToAddress(testBankKey.PublicKey)) worker.CommitTransactions(txs, crypto.PubkeyToAddress(testBankKey.PublicKey))
fmt.Println(worker.GetCurrentState().GetBalance(testBankAddress)) fmt.Println(worker.GetCurrentState().GetBalance(testBankAddress))
fmt.Println(worker.Commit().Root())
} }

@ -6,6 +6,7 @@ import (
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"math/big" "math/big"
"math/rand"
"net" "net"
"strings" "strings"
"sync" "sync"
@ -73,16 +74,18 @@ type Node struct {
State NodeState // State of the Node State NodeState // State of the Node
// Account Model // Account Model
Chain *core.BlockChain pendingTransactionsAccount types.Transactions // TODO: replace with txPool
TxPool *core.TxPool pendingTxMutexAccount sync.Mutex
BlockChannelAccount chan *types.Block // The channel to receive new blocks from Node Chain *core.BlockChain
worker *worker.Worker TxPool *core.TxPool
BlockChannelAccount chan *types.Block // The channel to receive new blocks from Node
Worker *worker.Worker
// Syncing component. // Syncing component.
downloaderServer *downloader.Server downloaderServer *downloader.Server
// Test only // Test only
testBankKey *ecdsa.PrivateKey TestBankKeys []*ecdsa.PrivateKey
} }
// Add new crossTx and proofs to the list of crossTx that needs to be sent back to client // Add new crossTx and proofs to the list of crossTx that needs to be sent back to client
@ -101,6 +104,14 @@ func (node *Node) addPendingTransactions(newTxs []*blockchain.Transaction) {
node.log.Debug("Got more transactions", "num", len(newTxs), "totalPending", len(node.pendingTransactions), "node", node) node.log.Debug("Got more transactions", "num", len(newTxs), "totalPending", len(node.pendingTransactions), "node", node)
} }
// Add new transactions to the pending transaction list
func (node *Node) addPendingTransactionsAccount(newTxs types.Transactions) {
node.pendingTxMutexAccount.Lock()
node.pendingTransactionsAccount = append(node.pendingTransactionsAccount, newTxs...)
node.pendingTxMutexAccount.Unlock()
node.log.Debug("Got more transactions (account model)", "num", len(newTxs), "totalPending", len(node.pendingTransactionsAccount), "node", node)
}
// Take out a subset of valid transactions from the pending transaction list // Take out a subset of valid transactions from the pending transaction list
// Note the pending transaction list will then contain the rest of the txs // Note the pending transaction list will then contain the rest of the txs
func (node *Node) getTransactionsForNewBlock(maxNumTxs int) ([]*blockchain.Transaction, []*blockchain.CrossShardTxAndProof) { func (node *Node) getTransactionsForNewBlock(maxNumTxs int) ([]*blockchain.Transaction, []*blockchain.CrossShardTxAndProof) {
@ -114,6 +125,20 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) ([]*blockchain.Trans
return selected, crossShardTxs return selected, crossShardTxs
} }
// Take out a subset of valid transactions from the pending transaction list
// Note the pending transaction list will then contain the rest of the txs
func (node *Node) getTransactionsForNewBlockAccount(maxNumTxs int) (types.Transactions, []*blockchain.CrossShardTxAndProof) {
node.pendingTxMutexAccount.Lock()
selected, unselected, invalid, crossShardTxs := node.pendingTransactionsAccount, types.Transactions{}, types.Transactions{}, []*blockchain.CrossShardTxAndProof{}
_ = invalid // invalid txs are discard
node.log.Debug("Invalid transactions discarded", "number", len(invalid))
node.pendingTransactionsAccount = unselected
node.log.Debug("Remaining pending transactions", "number", len(node.pendingTransactionsAccount))
node.pendingTxMutexAccount.Unlock()
return selected, crossShardTxs //TODO: replace cross-shard proofs for account model
}
// StartServer starts a server and process the request by a handler. // StartServer starts a server and process the request by a handler.
func (node *Node) StartServer(port string) { func (node *Node) StartServer(port string) {
if node.SyncNode { if node.SyncNode {
@ -169,6 +194,17 @@ func (node *Node) countNumTransactionsInBlockchain() int {
return count return count
} }
// Count the total number of transactions in the blockchain
// Currently used for stats reporting purpose
func (node *Node) countNumTransactionsInBlockchainAccount() int {
count := 0
for curBlock := node.Chain.CurrentBlock(); curBlock != nil; {
count += len(curBlock.Transactions())
curBlock = node.Chain.GetBlockByHash(curBlock.ParentHash())
}
return count
}
//ConnectIdentityChain connects to identity chain //ConnectIdentityChain connects to identity chain
func (node *Node) ConnectBeaconChain() { func (node *Node) ConnectBeaconChain() {
Nnode := &NetworkNode{SelfPeer: node.SelfPeer, IDCPeer: node.IDCPeer} Nnode := &NetworkNode{SelfPeer: node.SelfPeer, IDCPeer: node.IDCPeer}
@ -230,14 +266,26 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase) *Node {
node.db = db node.db = db
// (account model) // (account model)
rand.Seed(0)
len := 1000000
bytes := make([]byte, len)
for i := 0; i < len; i++ {
bytes[i] = byte(rand.Intn(100))
}
reader := strings.NewReader(string(bytes))
genesisAloc := make(core.GenesisAlloc)
for i := 0; i < 100; i++ {
testBankKey, _ := ecdsa.GenerateKey(crypto.S256(), reader)
testBankAddress := crypto.PubkeyToAddress(testBankKey.PublicKey)
testBankFunds := big.NewInt(10000000000)
genesisAloc[testBankAddress] = core.GenesisAccount{Balance: testBankFunds}
node.TestBankKeys = append(node.TestBankKeys, testBankKey)
}
node.testBankKey, _ = ecdsa.GenerateKey(crypto.S256(), strings.NewReader("Fixed source of randomnessasdffffffffffffffffffffffffffffffffffffffffsdffffffffffffffffffffffffffffffffffffffffffffffffffffff"))
testBankAddress := crypto.PubkeyToAddress(node.testBankKey.PublicKey)
testBankFunds := big.NewInt(1000000000000000000)
database := hdb.NewMemDatabase() database := hdb.NewMemDatabase()
gspec := core.Genesis{ gspec := core.Genesis{
Config: params.TestChainConfig, Config: params.TestChainConfig,
Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}, Alloc: genesisAloc,
} }
_ = gspec.MustCommit(database) _ = gspec.MustCommit(database)
@ -246,11 +294,7 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase) *Node {
node.Chain = chain node.Chain = chain
node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, params.TestChainConfig, chain) node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, params.TestChainConfig, chain)
node.BlockChannelAccount = make(chan *types.Block) node.BlockChannelAccount = make(chan *types.Block)
node.worker = worker.New(params.TestChainConfig, chain, bft.NewFaker()) node.Worker = worker.New(params.TestChainConfig, chain, bft.NewFaker())
fmt.Println("BALANCE")
fmt.Println(node.worker.GetCurrentState().GetBalance(testBankAddress))
} }
// Logger // Logger
node.log = log.New() node.log = log.New()

@ -5,17 +5,14 @@ import (
"bytes" "bytes"
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"math/big" "github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/crypto/pki"
"net" "net"
"os" "os"
"strconv" "strconv"
"time" "time"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/blockchain"
hmy_crypto "github.com/harmony-one/harmony/crypto" hmy_crypto "github.com/harmony-one/harmony/crypto"
"github.com/harmony-one/harmony/p2p" "github.com/harmony-one/harmony/p2p"
@ -143,45 +140,49 @@ func (node *Node) NodeHandler(conn net.Conn) {
node.log.Info("NET: received message: Node/Control") node.log.Info("NET: received message: Node/Control")
controlType := msgPayload[0] controlType := msgPayload[0]
if proto_node.ControlMessageType(controlType) == proto_node.STOP { if proto_node.ControlMessageType(controlType) == proto_node.STOP {
node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain()) if node.Chain == nil {
node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain())
sizeInBytes := node.UtxoPool.GetSizeInByteOfUtxoMap()
node.log.Debug("UtxoPool Report", "numEntries", len(node.UtxoPool.UtxoMap), "sizeInBytes", sizeInBytes) sizeInBytes := node.UtxoPool.GetSizeInByteOfUtxoMap()
node.log.Debug("UtxoPool Report", "numEntries", len(node.UtxoPool.UtxoMap), "sizeInBytes", sizeInBytes)
avgBlockSizeInBytes := 0
txCount := 0 avgBlockSizeInBytes := 0
blockCount := 0 txCount := 0
totalTxCount := 0 blockCount := 0
totalBlockCount := 0 totalTxCount := 0
avgTxSize := 0 totalBlockCount := 0
avgTxSize := 0
for _, block := range node.blockchain.Blocks {
if block.IsStateBlock() { for _, block := range node.blockchain.Blocks {
totalTxCount += int(block.State.NumTransactions) if block.IsStateBlock() {
totalBlockCount += int(block.State.NumBlocks) totalTxCount += int(block.State.NumTransactions)
} else { totalBlockCount += int(block.State.NumBlocks)
byteBuffer := bytes.NewBuffer([]byte{}) } else {
encoder := gob.NewEncoder(byteBuffer) byteBuffer := bytes.NewBuffer([]byte{})
encoder.Encode(block) encoder := gob.NewEncoder(byteBuffer)
avgBlockSizeInBytes += len(byteBuffer.Bytes()) encoder.Encode(block)
avgBlockSizeInBytes += len(byteBuffer.Bytes())
txCount += len(block.Transactions)
blockCount++ txCount += len(block.Transactions)
totalTxCount += len(block.TransactionIds) blockCount++
totalBlockCount++ totalTxCount += len(block.TransactionIds)
totalBlockCount++
byteBuffer = bytes.NewBuffer([]byte{})
encoder = gob.NewEncoder(byteBuffer) byteBuffer = bytes.NewBuffer([]byte{})
encoder.Encode(block.Transactions) encoder = gob.NewEncoder(byteBuffer)
avgTxSize += len(byteBuffer.Bytes()) encoder.Encode(block.Transactions)
avgTxSize += len(byteBuffer.Bytes())
}
}
if blockCount != 0 {
avgBlockSizeInBytes = avgBlockSizeInBytes / blockCount
avgTxSize = avgTxSize / txCount
} }
}
if blockCount != 0 {
avgBlockSizeInBytes = avgBlockSizeInBytes / blockCount
avgTxSize = avgTxSize / txCount
}
node.log.Debug("Blockchain Report", "totalNumBlocks", totalBlockCount, "avgBlockSizeInCurrentEpoch", avgBlockSizeInBytes, "totalNumTxs", totalTxCount, "avgTxSzieInCurrentEpoch", avgTxSize) node.log.Debug("Blockchain Report", "totalNumBlocks", totalBlockCount, "avgBlockSizeInCurrentEpoch", avgBlockSizeInBytes, "totalNumTxs", totalTxCount, "avgTxSzieInCurrentEpoch", avgTxSize)
} else {
node.log.Debug("Stopping Node (Account Model)", "node", node, "CurBlockNum", node.Chain.CurrentHeader().Number, "numTxsProcessed", node.countNumTransactionsInBlockchainAccount())
}
os.Exit(0) os.Exit(0)
} }
@ -260,13 +261,23 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) {
switch txMessageType { switch txMessageType {
case proto_node.Send: case proto_node.Send:
txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the Send messge type if node.Chain != nil {
txList := new([]*blockchain.Transaction) txs := types.Transactions{}
err := txDecoder.Decode(txList) err := rlp.Decode(bytes.NewReader(msgPayload[1:]), &txs) // skip the Send messge type
if err != nil { if err != nil {
node.log.Error("Failed to deserialize transaction list", "error", err) node.log.Error("Failed to deserialize transaction list", "error", err)
}
node.addPendingTransactionsAccount(txs)
} else {
txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the Send messge type
txList := new([]*blockchain.Transaction)
err := txDecoder.Decode(&txList)
if err != nil {
node.log.Error("Failed to deserialize transaction list", "error", err)
}
node.addPendingTransactions(*txList)
} }
node.addPendingTransactions(*txList)
case proto_node.Request: case proto_node.Request:
reader := bytes.NewBuffer(msgPayload[1:]) reader := bytes.NewBuffer(msgPayload[1:])
txIDs := make(map[[32]byte]bool) txIDs := make(map[[32]byte]bool)
@ -379,21 +390,31 @@ func (node *Node) WaitForConsensusReadyAccount(readySignal chan struct{}) {
} }
if !retry { if !retry {
// Normal tx block consensus for {
// TODO: add new block generation logic if len(node.pendingTransactionsAccount) >= 1000 {
txs := make([]*types.Transaction, 100) // Normal tx block consensus
for i, _ := range txs { selectedTxs, _ := node.getTransactionsForNewBlockAccount(MaxNumberOfTransactionsPerBlock)
randomUserKey, _ := crypto.GenerateKey() err := node.Worker.UpdateCurrent()
randomUserAddress := crypto.PubkeyToAddress(randomUserKey.PublicKey) if err != nil {
tx, _ := types.SignTx(types.NewTransaction(node.worker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(node.testBankKey.PublicKey)), randomUserAddress, big.NewInt(1000), params.TxGas, nil, nil), types.HomesteadSigner{}, node.testBankKey) node.log.Debug("Failed updating worker's state", "Error", err)
txs[i] = tx }
err = node.Worker.CommitTransactions(selectedTxs, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey))
if err == nil {
block, err := node.Worker.Commit()
if err != nil {
node.log.Debug("Failed commiting new block", "Error", err)
} else {
newBlock = block
break
}
} else {
node.log.Debug("Failed to create new block", "Error", err)
}
}
// If not enough transactions to run Consensus,
// periodically check whether we have enough transactions to package into block.
time.Sleep(1 * time.Second)
} }
node.worker.CommitTransactions(txs, crypto.PubkeyToAddress(node.testBankKey.PublicKey))
newBlock = node.worker.Commit()
// If not enough transactions to run Consensus,
// periodically check whether we have enough transactions to package into block.
time.Sleep(1 * time.Second)
} }
// Send the new block to Consensus so it can be confirmed. // Send the new block to Consensus so it can be confirmed.
@ -446,11 +467,12 @@ func (node *Node) VerifyNewBlock(newBlock *blockchain.Block) bool {
// VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on // VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on
func (node *Node) VerifyNewBlockAccount(newBlock *types.Block) bool { func (node *Node) VerifyNewBlockAccount(newBlock *types.Block) bool {
fmt.Println("VerifyingNNNNNNNNNNNNNN") err := node.Chain.ValidateNewBlock(newBlock, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey))
if err != nil {
fmt.Println("BALANCE 1") node.log.Debug("Failed verifying new block", "Error", err, "tx", newBlock.Transactions()[0])
fmt.Println(node.worker.GetCurrentState().GetBalance(crypto.PubkeyToAddress(node.testBankKey.PublicKey))) return false
return node.Chain.ValidateNewBlock(newBlock, crypto.PubkeyToAddress(node.testBankKey.PublicKey)) }
return true
} }
// PostConsensusProcessing is called by consensus participants, after consensus is done, to: // PostConsensusProcessing is called by consensus participants, after consensus is done, to:
@ -483,11 +505,26 @@ func (node *Node) PostConsensusProcessing(newBlock *blockchain.Block) {
node.BroadcastNewBlock(newBlock) node.BroadcastNewBlock(newBlock)
} }
accountBlock := new(types.Block)
err := rlp.DecodeBytes(newBlock.AccountBlock, accountBlock)
if err != nil {
node.log.Error("Failed decoding the block with RLP")
}
node.AddNewBlock(newBlock) node.AddNewBlock(newBlock)
node.UpdateUtxoAndState(newBlock) node.UpdateUtxoAndState(newBlock)
}
// AddNewBlockAccount is usedd to add new block into the blockchain.
func (node *Node) AddNewBlockAccount(newBlock *types.Block) {
num, err := node.Chain.InsertChain([]*types.Block{newBlock})
if err != nil {
node.log.Debug("Error adding to chain", "numBlocks", num, "Error", err)
}
} }
// AddNewBlock is usedd to add new block into the blockchain. // AddNewBlock is usedd to add new block into the utxo-based blockchain.
func (node *Node) AddNewBlock(newBlock *blockchain.Block) { func (node *Node) AddNewBlock(newBlock *blockchain.Block) {
// Add it to blockchain // Add it to blockchain
node.blockchain.Blocks = append(node.blockchain.Blocks, newBlock) node.blockchain.Blocks = append(node.blockchain.Blocks, newBlock)
@ -496,6 +533,14 @@ func (node *Node) AddNewBlock(newBlock *blockchain.Block) {
node.log.Info("Writing new block into disk.") node.log.Info("Writing new block into disk.")
newBlock.Write(node.db, strconv.Itoa(len(node.blockchain.Blocks))) newBlock.Write(node.db, strconv.Itoa(len(node.blockchain.Blocks)))
} }
// Account model
accountBlock := new(types.Block)
err := rlp.DecodeBytes(newBlock.AccountBlock, accountBlock)
if err != nil {
node.log.Error("Failed decoding the block with RLP")
}
node.AddNewBlockAccount(accountBlock)
} }
// UpdateUtxoAndState updates Utxo and state. // UpdateUtxoAndState updates Utxo and state.

@ -1,9 +1,6 @@
package worker package worker
import ( import (
"math/big"
"time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/params"
"github.com/harmony-one/harmony/consensus" "github.com/harmony-one/harmony/consensus"
@ -11,6 +8,8 @@ import (
"github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/state"
"github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/types"
"github.com/harmony-one/harmony/core/vm" "github.com/harmony-one/harmony/core/vm"
"math/big"
"time"
) )
// environment is the worker's current environment and holds all of the current state information. // environment is the worker's current environment and holds all of the current state information.
@ -50,14 +49,34 @@ func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Addres
return receipt.Logs, nil return receipt.Logs, nil
} }
func (w *Worker) CommitTransactions(txs []*types.Transaction, coinbase common.Address) { func (w *Worker) CommitTransactions(txs []*types.Transaction, coinbase common.Address) error {
snap := w.current.state.Snapshot()
if w.current.gasPool == nil { if w.current.gasPool == nil {
w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit) w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit)
} }
for _, tx := range txs { for _, tx := range txs {
w.commitTransaction(tx, coinbase) _, err := w.commitTransaction(tx, coinbase)
if err != nil {
w.current.state.RevertToSnapshot(snap)
return err
}
}
return nil
}
func (w *Worker) UpdateCurrent() error {
parent := w.chain.CurrentBlock()
num := parent.Number()
timestamp := time.Now().Unix()
header := &types.Header{
ParentHash: parent.Hash(),
Number: num.Add(num, common.Big1),
GasLimit: core.CalcGasLimit(parent, w.gasFloor, w.gasCeil),
Time: big.NewInt(timestamp),
} }
return w.makeCurrent(parent, header)
} }
// makeCurrent creates a new environment for the current cycle. // makeCurrent creates a new environment for the current cycle.
@ -79,13 +98,13 @@ func (w *Worker) GetCurrentState() *state.StateDB {
return w.current.state return w.current.state
} }
func (w *Worker) Commit() *types.Block { func (w *Worker) Commit() (*types.Block, error) {
s := w.current.state.Copy() s := w.current.state.Copy()
block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, w.current.receipts) block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, w.current.receipts)
if err != nil { if err != nil {
return nil return nil, err
} }
return block return block, nil
} }
func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus.Engine) *Worker { func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus.Engine) *Worker {
@ -95,7 +114,7 @@ func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus.En
engine: engine, engine: engine,
} }
worker.gasFloor = 0 worker.gasFloor = 0
worker.gasCeil = 10000000 worker.gasCeil = 1000000000000000
parent := worker.chain.CurrentBlock() parent := worker.chain.CurrentBlock()
num := parent.Number() num := parent.Number()

@ -3,6 +3,9 @@ package node
import ( import (
"bytes" "bytes"
"encoding/gob" "encoding/gob"
"fmt"
"github.com/ethereum/go-ethereum/rlp"
"github.com/harmony-one/harmony/core/types"
"log" "log"
"github.com/harmony-one/harmony/blockchain" "github.com/harmony-one/harmony/blockchain"
@ -144,6 +147,21 @@ func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []b
return byteBuffer.Bytes() return byteBuffer.Bytes()
} }
// ConstructTransactionListMessageAccount constructs serialized transactions in account model
func ConstructTransactionListMessageAccount(transactions types.Transactions) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})
byteBuffer.WriteByte(byte(Transaction))
byteBuffer.WriteByte(byte(Send))
txs, err := rlp.EncodeToBytes(transactions)
if err != nil {
fmt.Errorf("ERROR RLP %s", err)
return []byte{} // TODO(RJ): better handle of the error
}
byteBuffer.Write(txs)
return byteBuffer.Bytes()
}
// ConstructBlockchainSyncMessage constructs Blockchain Sync Message. // ConstructBlockchainSyncMessage constructs Blockchain Sync Message.
func ConstructBlockchainSyncMessage(msgType BlockchainSyncMessageType, blockHash [32]byte) []byte { func ConstructBlockchainSyncMessage(msgType BlockchainSyncMessageType, blockHash [32]byte) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)})

@ -18,17 +18,14 @@ package trie
import ( import (
"fmt" "fmt"
"io"
"sync"
"time"
hdb "github.com/harmony-one/harmony/db"
"github.com/simple-rules/harmony-benchmark/db"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rlp"
hdb "github.com/harmony-one/harmony/db"
"io"
"sync"
"time"
) )
var ( var (
@ -674,7 +671,7 @@ func (db *Database) Commit(node common.Hash, report bool) error {
} }
// commit is the private locked version of Commit. // commit is the private locked version of Commit.
func (db *Database) commit(hash common.Hash, batch db.Batch) error { func (db *Database) commit(hash common.Hash, batch hdb.Batch) error {
// If the node does not exist, it's a previously committed node // If the node does not exist, it's a previously committed node
node, ok := db.nodes[hash] node, ok := db.nodes[hash]
if !ok { if !ok {

Loading…
Cancel
Save