diff --git a/benchmark.go b/benchmark.go index 25db29d70..af17637fa 100644 --- a/benchmark.go +++ b/benchmark.go @@ -74,7 +74,7 @@ func loggingInit(logFolder, role, ip, port string, onlyLogTps bool) { } func main() { - accountModel := flag.Bool("account_model", false, "Whether to use account model") + accountModel := flag.Bool("account_model", true, "Whether to use account model") // TODO: use http://getmyipaddress.org/ or http://www.get-myip.com/ to retrieve my IP address ip := flag.String("ip", "127.0.0.1", "IP of the node") port := flag.String("port", "9000", "port of the node.") @@ -113,15 +113,14 @@ func main() { var leader p2p.Peer var selfPeer p2p.Peer var clientPeer *p2p.Peer - // Use Peer Discovery to get shard/leader/peer/... + priKey, pubKey := utils.GenKey(*ip, *port) if *peerDisvoery { - pubKey, priKey := utils.GenKey(*ip, *port) // Contact Identity Chain // This is a blocking call // Assume @ak has get it working // TODO: this has to work with @ak's fix - discoveryConfig := discovery.New(pubKey, priKey) + discoveryConfig := discovery.New(priKey, pubKey) err := discoveryConfig.StartClientMode(*idcIP, *idcPort) if err != nil { @@ -144,6 +143,7 @@ func main() { // Create client peer. clientPeer = distributionConfig.GetClientPeer() } + selfPeer.PubKey = pubKey var role string if leader.Ip == *ip && leader.Port == *port { diff --git a/client/txgen/main.go b/client/txgen/main.go index 532254b47..1e89f56bd 100644 --- a/client/txgen/main.go +++ b/client/txgen/main.go @@ -1,11 +1,11 @@ package main import ( - "encoding/binary" - "encoding/hex" "flag" "fmt" - "math/rand" + "github.com/ethereum/go-ethereum/rlp" + "github.com/harmony-one/harmony/client/txgen/txgen" + "github.com/harmony-one/harmony/core/types" "os" "path" "runtime" @@ -16,7 +16,6 @@ import ( "github.com/harmony-one/harmony/client" client_config "github.com/harmony-one/harmony/client/config" "github.com/harmony-one/harmony/consensus" - "github.com/harmony-one/harmony/crypto/pki" "github.com/harmony-one/harmony/log" "github.com/harmony-one/harmony/node" "github.com/harmony-one/harmony/p2p" @@ -24,235 +23,25 @@ import ( ) var ( - version string - builtBy string - builtAt string - commit string -) - -type txGenSettings struct { - numOfAddress int - crossShard bool - maxNumTxsPerBatch int - crossShardRatio int -} - -var ( + version string + builtBy string + builtAt string + commit string utxoPoolMutex sync.Mutex - setting txGenSettings ) -type TxInfo struct { - // Global Input - shardID int - dataNodes []*node.Node - // Temp Input - id [32]byte - index uint32 - value int - address [20]byte - // Output - txs []*blockchain.Transaction - crossTxs []*blockchain.Transaction - txCount int -} - -// Generates at most "maxNumTxs" number of simulated transactions based on the current UtxoPools of all shards. -// The transactions are generated by going through the existing utxos and -// randomly select a subset of them as the input for each new transaction. The output -// address of the new transaction are randomly selected from [0 - N), where N is the total number of fake addresses. -// -// When crossShard=true, besides the selected utxo input, select another valid utxo as input from the same address in a second shard. -// Similarly, generate another utxo output in that second shard. -// -// NOTE: the genesis block should contain N coinbase transactions which add -// token (1000) to each address in [0 - N). See node.AddTestingAddresses() -// -// Params: -// subsetId - the which subset of the utxo to work on (used to select addresses) -// shardID - the shardID for current shard -// dataNodes - nodes containing utxopools of all shards -// Returns: -// all single-shard txs -// all cross-shard txs -func generateSimulatedTransactions(subsetId, numSubset int, shardID int, dataNodes []*node.Node) ([]*blockchain.Transaction, []*blockchain.Transaction) { - /* - UTXO map structure: - address - [ - txID1 - [ - outputIndex1 - value1 - outputIndex2 - value2 - ] - txID2 - [ - outputIndex1 - value1 - outputIndex2 - value2 - ] - ] - */ - - txInfo := TxInfo{} - txInfo.shardID = shardID - txInfo.dataNodes = dataNodes - txInfo.txCount = 0 - -UTXOLOOP: - // Loop over all addresses - for address, txMap := range dataNodes[shardID].UtxoPool.UtxoMap { - if int(binary.BigEndian.Uint32(address[:]))%numSubset == subsetId%numSubset { // Work on one subset of utxo at a time - txInfo.address = address - // Loop over all txIDs for the address - for txIDStr, utxoMap := range txMap { - // Parse TxId - id, err := hex.DecodeString(txIDStr) - if err != nil { - continue - } - copy(txInfo.id[:], id[:]) - - // Loop over all utxos for the txID - utxoSize := len(utxoMap) - batchSize := utxoSize / numSubset - i := subsetId % numSubset - counter := 0 - for index, value := range utxoMap { - counter++ - if batchSize*i < counter && counter > batchSize*(i+1) { - continue - } - txInfo.index = index - txInfo.value = value - - randNum := rand.Intn(100) - - subsetRatio := 100 // / numSubset - if randNum < subsetRatio { // Sample based on batch size - if setting.crossShard && randNum < subsetRatio*setting.crossShardRatio/100 { // 30% cross shard transactions: add another txinput from another shard - generateCrossShardTx(&txInfo) - } else { - generateSingleShardTx(&txInfo) - } - if txInfo.txCount >= setting.maxNumTxsPerBatch { - break UTXOLOOP - } - } - } - } - } - } - log.Info("UTXO CLIENT", "numUtxo", dataNodes[shardID].UtxoPool.CountNumOfUtxos(), "shardID", shardID) - log.Debug("[Generator] generated transations", "single-shard", len(txInfo.txs), "cross-shard", len(txInfo.crossTxs)) - return txInfo.txs, txInfo.crossTxs -} - -func generateCrossShardTx(txInfo *TxInfo) { - nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID - crossShardID := nodeShardID - // a random shard to spend money to - for { - crossShardID = uint32(rand.Intn(len(txInfo.dataNodes))) - if crossShardID != nodeShardID { - break - } - } - - //crossShardNode := txInfo.dataNodes[crossShardID] - //crossShardUtxosMap := crossShardNode.UtxoPool.UtxoMap[txInfo.address] - // - //// Get the cross shard utxo from another shard - //var crossTxin *blockchain.TXInput - //crossUtxoValue := 0 - //// Loop over utxos for the same address from the other shard and use the first utxo as the second cross tx input - //for crossTxIdStr, crossShardUtxos := range crossShardUtxosMap { - // // Parse TxId - // id, err := hex.DecodeString(crossTxIdStr) - // if err != nil { - // continue - // } - // crossTxId := [32]byte{} - // copy(crossTxId[:], id[:]) - // - // for crossShardIndex, crossShardValue := range crossShardUtxos { - // crossUtxoValue = crossShardValue - // crossTxin = blockchain.NewTXInput(blockchain.NewOutPoint(&crossTxId, crossShardIndex), txInfo.address, crossShardID) - // break - // } - // if crossTxin != nil { - // break - // } - //} - - // Add the utxo from current shard - txIn := blockchain.NewTXInput(blockchain.NewOutPoint(&txInfo.id, txInfo.index), txInfo.address, nodeShardID) - txInputs := []blockchain.TXInput{*txIn} - - // Add the utxo from the other shard, if any - //if crossTxin != nil { // This means the ratio of cross shard tx could be lower than 1/3 - // txInputs = append(txInputs, *crossTxin) - //} - - // Spend the utxo from the current shard to a random address in [0 - N) - txout := blockchain.TXOutput{Amount: txInfo.value, Address: pki.GetAddressFromInt(rand.Intn(setting.numOfAddress) + 1), ShardID: crossShardID} - - txOutputs := []blockchain.TXOutput{txout} - - // Spend the utxo from the other shard, if any, to a random address in [0 - N) - //if crossTxin != nil { - // crossTxout := blockchain.TXOutput{Amount: crossUtxoValue, Address: pki.GetAddressFromInt(rand.Intn(setting.numOfAddress) + 1), ShardID: crossShardID} - // txOutputs = append(txOutputs, crossTxout) - //} - - // Construct the new transaction - tx := blockchain.Transaction{ID: [32]byte{}, TxInput: txInputs, TxOutput: txOutputs, Proofs: nil} - - priKeyInt, ok := client.LookUpIntPriKey(txInfo.address) - if ok { - tx.PublicKey = pki.GetBytesFromPublicKey(pki.GetPublicKeyFromScalar(pki.GetPrivateKeyScalarFromInt(priKeyInt))) - - tx.SetID() // TODO(RJ): figure out the correct way to set Tx ID. - tx.Sign(pki.GetPrivateKeyScalarFromInt(priKeyInt)) - } else { - log.Error("Failed to look up the corresponding private key from address", "Address", txInfo.address) - return - } - - txInfo.crossTxs = append(txInfo.crossTxs, &tx) - txInfo.txCount++ -} - -func generateSingleShardTx(txInfo *TxInfo) { - nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID - // Add the utxo as new tx input - txin := blockchain.NewTXInput(blockchain.NewOutPoint(&txInfo.id, txInfo.index), txInfo.address, nodeShardID) - - // Spend the utxo to a random address in [0 - N) - txout := blockchain.TXOutput{Amount: txInfo.value, Address: pki.GetAddressFromInt(rand.Intn(setting.numOfAddress) + 1), ShardID: nodeShardID} - tx := blockchain.Transaction{ID: [32]byte{}, TxInput: []blockchain.TXInput{*txin}, TxOutput: []blockchain.TXOutput{txout}, Proofs: nil} - - priKeyInt, ok := client.LookUpIntPriKey(txInfo.address) - if ok { - tx.PublicKey = pki.GetBytesFromPublicKey(pki.GetPublicKeyFromScalar(pki.GetPrivateKeyScalarFromInt(priKeyInt))) - tx.SetID() // TODO(RJ): figure out the correct way to set Tx ID. - tx.Sign(pki.GetPrivateKeyScalarFromInt(priKeyInt)) - } else { - log.Error("Failed to look up the corresponding private key from address", "Address", txInfo.address) - return - } - - txInfo.txs = append(txInfo.txs, &tx) - txInfo.txCount++ -} - func printVersion(me string) { fmt.Fprintf(os.Stderr, "Harmony (C) 2018. %v, version %v-%v (%v %v)\n", path.Base(me), version, commit, builtBy, builtAt) os.Exit(0) } func main() { + accountModel := flag.Bool("account_model", true, "Whether to use account model") configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 20000, "number of transactions to send per message") logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution") numSubset := flag.Int("numSubset", 3, "the number of subsets of utxos to process separately") - duration := flag.Int("duration", 60, "duration of the tx generation in second. If it's negative, the experiment runs forever.") + duration := flag.Int("duration", 10, "duration of the tx generation in second. If it's negative, the experiment runs forever.") versionFlag := flag.Bool("version", false, "Output version info") crossShardRatio := flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.") flag.Parse() @@ -269,11 +58,13 @@ func main() { config.ReadConfigFile(*configFile) shardIDLeaderMap := config.GetShardIDToLeaderMap() - setting.numOfAddress = 10000 // Do cross shard tx if there are more than one shard - setting.crossShard = len(shardIDLeaderMap) > 1 - setting.maxNumTxsPerBatch = *maxNumTxsPerBatch - setting.crossShardRatio = *crossShardRatio + setting := txgen.TxGenSettings{ + 10000, + len(shardIDLeaderMap) > 1, + *maxNumTxsPerBatch, + *crossShardRatio, + } // TODO(Richard): refactor this chuck to a single method // Setup a logger to stdout and log file. @@ -289,7 +80,7 @@ func main() { for shardID := range shardIDLeaderMap { node := node.New(&consensus.Consensus{ShardID: shardID}, nil) // Assign many fake addresses so we have enough address to play with at first - node.AddTestingAddresses(setting.numOfAddress) + node.AddTestingAddresses(setting.NumOfAddress) nodes = append(nodes, node) } @@ -313,6 +104,19 @@ func main() { utxoPoolMutex.Lock() node.UpdateUtxoAndState(block) utxoPoolMutex.Unlock() + + accountBlock := new(types.Block) + err := rlp.DecodeBytes(block.AccountBlock, accountBlock) + fmt.Println("RECEIVED NEW BLOCK ", len(accountBlock.Transactions())) + if err != nil { + log.Error("Failed decoding the block with RLP") + } else { + err = node.Worker.CommitTransactions(accountBlock.Transactions(), accountBlock.Coinbase()) + node.Worker.UpdateCurrent() + if err != nil { + log.Debug("Failed to add new block to worker", "Error", err) + } + } } else { continue } @@ -335,57 +139,100 @@ func main() { client.InitLookUpIntPriKeyMap() subsetCounter := 0 - for { - t := time.Now() - if totalTime > 0 && t.Sub(start).Seconds() >= totalTime { - log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) - break + if *accountModel { + for { + t := time.Now() + if totalTime > 0 && t.Sub(start).Seconds() >= totalTime { + log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) + break + } + shardIDTxsMap := make(map[uint32]types.Transactions) + lock := sync.Mutex{} + var wg sync.WaitGroup + wg.Add(len(shardIDLeaderMap)) + + utxoPoolMutex.Lock() + log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0)) + for shardID, _ := range shardIDLeaderMap { // Generate simulated transactions + go func(shardID uint32) { + txs, _ := txgen.GenerateSimulatedTransactionsAccount(int(shardID), nodes, setting) + + // TODO: Put cross shard tx into a pending list waiting for proofs from leaders + + lock.Lock() + // Put txs into corresponding shards + shardIDTxsMap[shardID] = append(shardIDTxsMap[shardID], txs...) + lock.Unlock() + wg.Done() + }(shardID) + } + wg.Wait() + utxoPoolMutex.Unlock() + + lock.Lock() + for shardID, txs := range shardIDTxsMap { // Send the txs to corresponding shards + go func(shardID uint32, txs types.Transactions) { + SendTxsToLeaderAccount(shardIDLeaderMap[shardID], txs) + }(shardID, txs) + } + lock.Unlock() + + subsetCounter++ + time.Sleep(10000 * time.Millisecond) } - shardIDTxsMap := make(map[uint32][]*blockchain.Transaction) - lock := sync.Mutex{} - var wg sync.WaitGroup - wg.Add(len(shardIDLeaderMap)) - - utxoPoolMutex.Lock() - log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0)) - for shardID, _ := range shardIDLeaderMap { // Generate simulated transactions - go func(shardID uint32) { - txs, crossTxs := generateSimulatedTransactions(subsetCounter, *numSubset, int(shardID), nodes) - - // Put cross shard tx into a pending list waiting for proofs from leaders - if clientPort != "" { - clientNode.Client.PendingCrossTxsMutex.Lock() - for _, tx := range crossTxs { - clientNode.Client.PendingCrossTxs[tx.ID] = tx + } else { + for { + t := time.Now() + if totalTime > 0 && t.Sub(start).Seconds() >= totalTime { + log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) + break + } + shardIDTxsMap := make(map[uint32][]*blockchain.Transaction) + lock := sync.Mutex{} + var wg sync.WaitGroup + wg.Add(len(shardIDLeaderMap)) + + utxoPoolMutex.Lock() + log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0)) + for shardID, _ := range shardIDLeaderMap { // Generate simulated transactions + go func(shardID uint32) { + txs, crossTxs := txgen.GenerateSimulatedTransactions(subsetCounter, *numSubset, int(shardID), nodes, setting) + + // Put cross shard tx into a pending list waiting for proofs from leaders + if clientPort != "" { + clientNode.Client.PendingCrossTxsMutex.Lock() + for _, tx := range crossTxs { + clientNode.Client.PendingCrossTxs[tx.ID] = tx + } + clientNode.Client.PendingCrossTxsMutex.Unlock() } - clientNode.Client.PendingCrossTxsMutex.Unlock() - } - lock.Lock() - // Put txs into corresponding shards - shardIDTxsMap[shardID] = append(shardIDTxsMap[shardID], txs...) - for _, crossTx := range crossTxs { - for curShardID, _ := range client.GetInputShardIDsOfCrossShardTx(crossTx) { - shardIDTxsMap[curShardID] = append(shardIDTxsMap[curShardID], crossTx) + lock.Lock() + // Put txs into corresponding shards + shardIDTxsMap[shardID] = append(shardIDTxsMap[shardID], txs...) + for _, crossTx := range crossTxs { + for curShardID, _ := range client.GetInputShardIDsOfCrossShardTx(crossTx) { + shardIDTxsMap[curShardID] = append(shardIDTxsMap[curShardID], crossTx) + } } - } - lock.Unlock() - wg.Done() - }(shardID) - } - wg.Wait() - utxoPoolMutex.Unlock() - - lock.Lock() - for shardID, txs := range shardIDTxsMap { // Send the txs to corresponding shards - go func(shardID uint32, txs []*blockchain.Transaction) { - SendTxsToLeader(shardIDLeaderMap[shardID], txs) - }(shardID, txs) - } - lock.Unlock() + lock.Unlock() + wg.Done() + }(shardID) + } + wg.Wait() + utxoPoolMutex.Unlock() + + lock.Lock() + for shardID, txs := range shardIDTxsMap { // Send the txs to corresponding shards + go func(shardID uint32, txs []*blockchain.Transaction) { + SendTxsToLeader(shardIDLeaderMap[shardID], txs) + }(shardID, txs) + } + lock.Unlock() - subsetCounter++ - time.Sleep(10000 * time.Millisecond) + subsetCounter++ + time.Sleep(10000 * time.Millisecond) + } } // Send a stop message to stop the nodes at the end @@ -400,3 +247,9 @@ func SendTxsToLeader(leader p2p.Peer, txs []*blockchain.Transaction) { msg := proto_node.ConstructTransactionListMessage(txs) p2p.SendMessage(leader, msg) } + +func SendTxsToLeaderAccount(leader p2p.Peer, txs types.Transactions) { + log.Debug("[Generator] Sending account-based txs to...", "leader", leader, "numTxs", len(txs)) + msg := proto_node.ConstructTransactionListMessageAccount(txs) + p2p.SendMessage(leader, msg) +} diff --git a/client/txgen/txgen/account_txs_generator.go b/client/txgen/txgen/account_txs_generator.go new file mode 100644 index 000000000..4cab3ed2d --- /dev/null +++ b/client/txgen/txgen/account_txs_generator.go @@ -0,0 +1,32 @@ +package txgen + +import ( + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/params" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/node" + "math/big" +) + +type TxGenSettings struct { + NumOfAddress int + CrossShard bool + MaxNumTxsPerBatch int + CrossShardRatio int +} + +func GenerateSimulatedTransactionsAccount(shardID int, dataNodes []*node.Node, setting TxGenSettings) (types.Transactions, types.Transactions) { + _ = setting // TODO: take use of settings + node := dataNodes[shardID] + txs := make([]*types.Transaction, 1000) + for i := 0; i < 100; i++ { + baseNonce := node.Worker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(node.TestBankKeys[i].PublicKey)) + for j := 0; j < 10; j++ { + randomUserKey, _ := crypto.GenerateKey() + randomUserAddress := crypto.PubkeyToAddress(randomUserKey.PublicKey) + tx, _ := types.SignTx(types.NewTransaction(baseNonce+uint64(j), randomUserAddress, big.NewInt(1000), params.TxGas, nil, nil), types.HomesteadSigner{}, node.TestBankKeys[i]) + txs[i*10+j] = tx + } + } + return txs, nil +} diff --git a/client/txgen/txgen/utxo_txs_generator.go b/client/txgen/txgen/utxo_txs_generator.go new file mode 100644 index 000000000..2ed9841f0 --- /dev/null +++ b/client/txgen/txgen/utxo_txs_generator.go @@ -0,0 +1,212 @@ +package txgen + +import ( + "encoding/binary" + "encoding/hex" + "github.com/harmony-one/harmony/blockchain" + "github.com/harmony-one/harmony/client" + "github.com/harmony-one/harmony/crypto/pki" + "github.com/harmony-one/harmony/log" + "github.com/harmony-one/harmony/node" + "math/rand" +) + +type TxInfo struct { + // Global Input + shardID int + dataNodes []*node.Node + // Temp Input + id [32]byte + index uint32 + value int + address [20]byte + // Output + txs []*blockchain.Transaction + crossTxs []*blockchain.Transaction + txCount int +} + +// Generates at most "maxNumTxs" number of simulated transactions based on the current UtxoPools of all shards. +// The transactions are generated by going through the existing utxos and +// randomly select a subset of them as the input for each new transaction. The output +// address of the new transaction are randomly selected from [0 - N), where N is the total number of fake addresses. +// +// When crossShard=true, besides the selected utxo input, select another valid utxo as input from the same address in a second shard. +// Similarly, generate another utxo output in that second shard. +// +// NOTE: the genesis block should contain N coinbase transactions which add +// token (1000) to each address in [0 - N). See node.AddTestingAddresses() +// +// Params: +// subsetId - the which subset of the utxo to work on (used to select addresses) +// shardID - the shardID for current shard +// dataNodes - nodes containing utxopools of all shards +// Returns: +// all single-shard txs +// all cross-shard txs +func GenerateSimulatedTransactions(subsetId, numSubset int, shardID int, dataNodes []*node.Node, setting TxGenSettings) ([]*blockchain.Transaction, []*blockchain.Transaction) { + /* + UTXO map structure: + address - [ + txID1 - [ + outputIndex1 - value1 + outputIndex2 - value2 + ] + txID2 - [ + outputIndex1 - value1 + outputIndex2 - value2 + ] + ] + */ + + txInfo := TxInfo{} + txInfo.shardID = shardID + txInfo.dataNodes = dataNodes + txInfo.txCount = 0 + +UTXOLOOP: + // Loop over all addresses + for address, txMap := range dataNodes[shardID].UtxoPool.UtxoMap { + if int(binary.BigEndian.Uint32(address[:]))%numSubset == subsetId%numSubset { // Work on one subset of utxo at a time + txInfo.address = address + // Loop over all txIDs for the address + for txIDStr, utxoMap := range txMap { + // Parse TxId + id, err := hex.DecodeString(txIDStr) + if err != nil { + continue + } + copy(txInfo.id[:], id[:]) + + // Loop over all utxos for the txID + utxoSize := len(utxoMap) + batchSize := utxoSize / numSubset + i := subsetId % numSubset + counter := 0 + for index, value := range utxoMap { + counter++ + if batchSize*i < counter && counter > batchSize*(i+1) { + continue + } + txInfo.index = index + txInfo.value = value + + randNum := rand.Intn(100) + + subsetRatio := 100 // / numSubset + if randNum < subsetRatio { // Sample based on batch size + if setting.CrossShard && randNum < subsetRatio*setting.CrossShardRatio/100 { // 30% cross shard transactions: add another txinput from another shard + generateCrossShardTx(&txInfo, setting) + } else { + generateSingleShardTx(&txInfo, setting) + } + if txInfo.txCount >= setting.MaxNumTxsPerBatch { + break UTXOLOOP + } + } + } + } + } + } + log.Info("UTXO CLIENT", "numUtxo", dataNodes[shardID].UtxoPool.CountNumOfUtxos(), "shardID", shardID) + log.Debug("[Generator] generated transations", "single-shard", len(txInfo.txs), "cross-shard", len(txInfo.crossTxs)) + return txInfo.txs, txInfo.crossTxs +} + +func generateCrossShardTx(txInfo *TxInfo, setting TxGenSettings) { + nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID + crossShardID := nodeShardID + // a random shard to spend money to + for { + crossShardID = uint32(rand.Intn(len(txInfo.dataNodes))) + if crossShardID != nodeShardID { + break + } + } + + //crossShardNode := txInfo.dataNodes[crossShardID] + //crossShardUtxosMap := crossShardNode.UtxoPool.UtxoMap[txInfo.address] + // + //// Get the cross shard utxo from another shard + //var crossTxin *blockchain.TXInput + //crossUtxoValue := 0 + //// Loop over utxos for the same address from the other shard and use the first utxo as the second cross tx input + //for crossTxIdStr, crossShardUtxos := range crossShardUtxosMap { + // // Parse TxId + // id, err := hex.DecodeString(crossTxIdStr) + // if err != nil { + // continue + // } + // crossTxId := [32]byte{} + // copy(crossTxId[:], id[:]) + // + // for crossShardIndex, crossShardValue := range crossShardUtxos { + // crossUtxoValue = crossShardValue + // crossTxin = blockchain.NewTXInput(blockchain.NewOutPoint(&crossTxId, crossShardIndex), txInfo.address, crossShardID) + // break + // } + // if crossTxin != nil { + // break + // } + //} + + // Add the utxo from current shard + txIn := blockchain.NewTXInput(blockchain.NewOutPoint(&txInfo.id, txInfo.index), txInfo.address, nodeShardID) + txInputs := []blockchain.TXInput{*txIn} + + // Add the utxo from the other shard, if any + //if crossTxin != nil { // This means the ratio of cross shard tx could be lower than 1/3 + // txInputs = append(txInputs, *crossTxin) + //} + + // Spend the utxo from the current shard to a random address in [0 - N) + txout := blockchain.TXOutput{Amount: txInfo.value, Address: pki.GetAddressFromInt(rand.Intn(setting.NumOfAddress) + 1), ShardID: crossShardID} + + txOutputs := []blockchain.TXOutput{txout} + + // Spend the utxo from the other shard, if any, to a random address in [0 - N) + //if crossTxin != nil { + // crossTxout := blockchain.TXOutput{Amount: crossUtxoValue, Address: pki.GetAddressFromInt(rand.Intn(setting.numOfAddress) + 1), ShardID: crossShardID} + // txOutputs = append(txOutputs, crossTxout) + //} + + // Construct the new transaction + tx := blockchain.Transaction{ID: [32]byte{}, TxInput: txInputs, TxOutput: txOutputs, Proofs: nil} + + priKeyInt, ok := client.LookUpIntPriKey(txInfo.address) + if ok { + tx.PublicKey = pki.GetBytesFromPublicKey(pki.GetPublicKeyFromScalar(pki.GetPrivateKeyScalarFromInt(priKeyInt))) + + tx.SetID() // TODO(RJ): figure out the correct way to set Tx ID. + tx.Sign(pki.GetPrivateKeyScalarFromInt(priKeyInt)) + } else { + log.Error("Failed to look up the corresponding private key from address", "Address", txInfo.address) + return + } + + txInfo.crossTxs = append(txInfo.crossTxs, &tx) + txInfo.txCount++ +} + +func generateSingleShardTx(txInfo *TxInfo, setting TxGenSettings) { + nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID + // Add the utxo as new tx input + txin := blockchain.NewTXInput(blockchain.NewOutPoint(&txInfo.id, txInfo.index), txInfo.address, nodeShardID) + + // Spend the utxo to a random address in [0 - N) + txout := blockchain.TXOutput{Amount: txInfo.value, Address: pki.GetAddressFromInt(rand.Intn(setting.NumOfAddress) + 1), ShardID: nodeShardID} + tx := blockchain.Transaction{ID: [32]byte{}, TxInput: []blockchain.TXInput{*txin}, TxOutput: []blockchain.TXOutput{txout}, Proofs: nil} + + priKeyInt, ok := client.LookUpIntPriKey(txInfo.address) + if ok { + tx.PublicKey = pki.GetBytesFromPublicKey(pki.GetPublicKeyFromScalar(pki.GetPrivateKeyScalarFromInt(priKeyInt))) + tx.SetID() // TODO(RJ): figure out the correct way to set Tx ID. + tx.Sign(pki.GetPrivateKeyScalarFromInt(priKeyInt)) + } else { + log.Error("Failed to look up the corresponding private key from address", "Address", txInfo.address) + return + } + + txInfo.txs = append(txInfo.txs, &tx) + txInfo.txCount++ +} diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 1a11d6070..9064afb14 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -61,6 +61,7 @@ func (consensus *Consensus) WaitForNewBlockAccount(blockChannel chan *types.Bloc // time.Sleep(500 * time.Millisecond) data, err := rlp.EncodeToBytes(newBlock) if err == nil { + consensus.Log.Debug("Sample tx", "tx", newBlock.Transactions()[0]) consensus.startConsensus(&blockchain.Block{Hash: newBlock.Hash(), AccountBlock: data}) } else { consensus.Log.Error("Failed encoding the block with RLP") diff --git a/core/blockchain.go b/core/blockchain.go index 99a9ad69f..c1dc87942 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -189,30 +189,26 @@ func NewBlockChain(db hdb.Database, cacheConfig *CacheConfig, chainConfig *param return bc, nil } -func (bc *BlockChain) ValidateNewBlock(block *types.Block, address common.Address) bool { +func (bc *BlockChain) ValidateNewBlock(block *types.Block, address common.Address) error { state, err := state.New(bc.CurrentBlock().Root(), bc.stateCache) - fmt.Println("WITHIN NNNNNNNNNNNNNN", err) if err != nil { - return false + return err } - fmt.Println("Balance 3 ", state.GetBalance(address)) // Process block using the parent state as reference point. receipts, _, usedGas, err := bc.processor.Process(block, state, bc.vmConfig) if err != nil { bc.reportBlock(block, receipts, err) - return false + return err } - fmt.Println("WITHIN NNNNNNNNNNNNNN2", err) err = bc.Validator().ValidateState(block, bc.CurrentBlock(), state, receipts, usedGas) if err != nil { bc.reportBlock(block, receipts, err) - return false + return err } - fmt.Println("WITHIN NNNNNNNNNNNNNN3", err) - return true + return nil } func (bc *BlockChain) getProcInterrupt() bool { diff --git a/core/genesis.go b/core/genesis.go index e03323a6f..f73900b8d 100644 --- a/core/genesis.go +++ b/core/genesis.go @@ -115,7 +115,6 @@ func (h *storageJSON) UnmarshalText(text []byte) error { } offset := len(h) - len(text)/2 // pad on the left if _, err := hex.Decode(h[offset:], text); err != nil { - fmt.Println(err) return fmt.Errorf("invalid hex storage key/value %q", text) } return nil @@ -246,7 +245,7 @@ func (g *Genesis) ToBlock(db hdb.Database) *types.Block { Root: root, } if g.GasLimit == 0 { - head.GasLimit = params.GenesisGasLimit + head.GasLimit = 10000000000 // TODO(RJ): figure out better solution. // params.GenesisGasLimit } if g.Difficulty == nil { head.Difficulty = params.GenesisDifficulty diff --git a/discovery/discovery.go b/discovery/discovery.go index bbc234d0d..7ed3f1023 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -16,18 +16,18 @@ type ConfigEntry struct { leader p2p.Peer self p2p.Peer peers []p2p.Peer - pubK kyber.Scalar - priK kyber.Point + priK kyber.Scalar + pubK kyber.Point } func (config ConfigEntry) String() string { return fmt.Sprintf("idc: %v:%v", config.IP, config.Port) } -func New(pubK kyber.Scalar, priK kyber.Point) *ConfigEntry { +func New(priK kyber.Scalar, pubK kyber.Point) *ConfigEntry { var config ConfigEntry - config.pubK = pubK config.priK = priK + config.pubK = pubK config.peers = make([]p2p.Peer, 0) diff --git a/harmony/main.go b/harmony/main.go index f17d4d982..99ca260d5 100644 --- a/harmony/main.go +++ b/harmony/main.go @@ -84,7 +84,6 @@ func main() { txs := make([]*types.Transaction, 100) worker := worker.New(params.TestChainConfig, chain, consensus.NewFaker()) fmt.Println(worker.GetCurrentState().GetBalance(testBankAddress)) - fmt.Println(worker.Commit().Root()) for i, _ := range txs { randomUserKey, _ := crypto.GenerateKey() @@ -96,5 +95,4 @@ func main() { worker.CommitTransactions(txs, crypto.PubkeyToAddress(testBankKey.PublicKey)) fmt.Println(worker.GetCurrentState().GetBalance(testBankAddress)) - fmt.Println(worker.Commit().Root()) } diff --git a/node/node.go b/node/node.go index 8563f0470..224eb53c8 100644 --- a/node/node.go +++ b/node/node.go @@ -6,6 +6,7 @@ import ( "encoding/gob" "fmt" "math/big" + "math/rand" "net" "strings" "sync" @@ -73,16 +74,18 @@ type Node struct { State NodeState // State of the Node // Account Model - Chain *core.BlockChain - TxPool *core.TxPool - BlockChannelAccount chan *types.Block // The channel to receive new blocks from Node - worker *worker.Worker + pendingTransactionsAccount types.Transactions // TODO: replace with txPool + pendingTxMutexAccount sync.Mutex + Chain *core.BlockChain + TxPool *core.TxPool + BlockChannelAccount chan *types.Block // The channel to receive new blocks from Node + Worker *worker.Worker // Syncing component. downloaderServer *downloader.Server // Test only - testBankKey *ecdsa.PrivateKey + TestBankKeys []*ecdsa.PrivateKey } // Add new crossTx and proofs to the list of crossTx that needs to be sent back to client @@ -101,6 +104,14 @@ func (node *Node) addPendingTransactions(newTxs []*blockchain.Transaction) { node.log.Debug("Got more transactions", "num", len(newTxs), "totalPending", len(node.pendingTransactions), "node", node) } +// Add new transactions to the pending transaction list +func (node *Node) addPendingTransactionsAccount(newTxs types.Transactions) { + node.pendingTxMutexAccount.Lock() + node.pendingTransactionsAccount = append(node.pendingTransactionsAccount, newTxs...) + node.pendingTxMutexAccount.Unlock() + node.log.Debug("Got more transactions (account model)", "num", len(newTxs), "totalPending", len(node.pendingTransactionsAccount), "node", node) +} + // Take out a subset of valid transactions from the pending transaction list // Note the pending transaction list will then contain the rest of the txs func (node *Node) getTransactionsForNewBlock(maxNumTxs int) ([]*blockchain.Transaction, []*blockchain.CrossShardTxAndProof) { @@ -114,6 +125,20 @@ func (node *Node) getTransactionsForNewBlock(maxNumTxs int) ([]*blockchain.Trans return selected, crossShardTxs } +// Take out a subset of valid transactions from the pending transaction list +// Note the pending transaction list will then contain the rest of the txs +func (node *Node) getTransactionsForNewBlockAccount(maxNumTxs int) (types.Transactions, []*blockchain.CrossShardTxAndProof) { + node.pendingTxMutexAccount.Lock() + selected, unselected, invalid, crossShardTxs := node.pendingTransactionsAccount, types.Transactions{}, types.Transactions{}, []*blockchain.CrossShardTxAndProof{} + _ = invalid // invalid txs are discard + + node.log.Debug("Invalid transactions discarded", "number", len(invalid)) + node.pendingTransactionsAccount = unselected + node.log.Debug("Remaining pending transactions", "number", len(node.pendingTransactionsAccount)) + node.pendingTxMutexAccount.Unlock() + return selected, crossShardTxs //TODO: replace cross-shard proofs for account model +} + // StartServer starts a server and process the request by a handler. func (node *Node) StartServer(port string) { if node.SyncNode { @@ -169,6 +194,17 @@ func (node *Node) countNumTransactionsInBlockchain() int { return count } +// Count the total number of transactions in the blockchain +// Currently used for stats reporting purpose +func (node *Node) countNumTransactionsInBlockchainAccount() int { + count := 0 + for curBlock := node.Chain.CurrentBlock(); curBlock != nil; { + count += len(curBlock.Transactions()) + curBlock = node.Chain.GetBlockByHash(curBlock.ParentHash()) + } + return count +} + //ConnectIdentityChain connects to identity chain func (node *Node) ConnectBeaconChain() { Nnode := &NetworkNode{SelfPeer: node.SelfPeer, IDCPeer: node.IDCPeer} @@ -230,14 +266,26 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase) *Node { node.db = db // (account model) + rand.Seed(0) + len := 1000000 + bytes := make([]byte, len) + for i := 0; i < len; i++ { + bytes[i] = byte(rand.Intn(100)) + } + reader := strings.NewReader(string(bytes)) + genesisAloc := make(core.GenesisAlloc) + for i := 0; i < 100; i++ { + testBankKey, _ := ecdsa.GenerateKey(crypto.S256(), reader) + testBankAddress := crypto.PubkeyToAddress(testBankKey.PublicKey) + testBankFunds := big.NewInt(10000000000) + genesisAloc[testBankAddress] = core.GenesisAccount{Balance: testBankFunds} + node.TestBankKeys = append(node.TestBankKeys, testBankKey) + } - node.testBankKey, _ = ecdsa.GenerateKey(crypto.S256(), strings.NewReader("Fixed source of randomnessasdffffffffffffffffffffffffffffffffffffffffsdffffffffffffffffffffffffffffffffffffffffffffffffffffff")) - testBankAddress := crypto.PubkeyToAddress(node.testBankKey.PublicKey) - testBankFunds := big.NewInt(1000000000000000000) database := hdb.NewMemDatabase() gspec := core.Genesis{ Config: params.TestChainConfig, - Alloc: core.GenesisAlloc{testBankAddress: {Balance: testBankFunds}}, + Alloc: genesisAloc, } _ = gspec.MustCommit(database) @@ -246,11 +294,7 @@ func New(consensus *bft.Consensus, db *hdb.LDBDatabase) *Node { node.Chain = chain node.TxPool = core.NewTxPool(core.DefaultTxPoolConfig, params.TestChainConfig, chain) node.BlockChannelAccount = make(chan *types.Block) - node.worker = worker.New(params.TestChainConfig, chain, bft.NewFaker()) - - fmt.Println("BALANCE") - fmt.Println(node.worker.GetCurrentState().GetBalance(testBankAddress)) - + node.Worker = worker.New(params.TestChainConfig, chain, bft.NewFaker()) } // Logger node.log = log.New() diff --git a/node/node_handler.go b/node/node_handler.go index 8426ee7bb..e5a447b8b 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -5,17 +5,14 @@ import ( "bytes" "encoding/gob" "fmt" - "math/big" + "github.com/ethereum/go-ethereum/rlp" + "github.com/harmony-one/harmony/core/types" + "github.com/harmony-one/harmony/crypto/pki" "net" "os" "strconv" "time" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/params" - "github.com/ethereum/go-ethereum/rlp" - "github.com/harmony-one/harmony/core/types" - "github.com/harmony-one/harmony/blockchain" hmy_crypto "github.com/harmony-one/harmony/crypto" "github.com/harmony-one/harmony/p2p" @@ -143,45 +140,49 @@ func (node *Node) NodeHandler(conn net.Conn) { node.log.Info("NET: received message: Node/Control") controlType := msgPayload[0] if proto_node.ControlMessageType(controlType) == proto_node.STOP { - node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain()) - - sizeInBytes := node.UtxoPool.GetSizeInByteOfUtxoMap() - node.log.Debug("UtxoPool Report", "numEntries", len(node.UtxoPool.UtxoMap), "sizeInBytes", sizeInBytes) - - avgBlockSizeInBytes := 0 - txCount := 0 - blockCount := 0 - totalTxCount := 0 - totalBlockCount := 0 - avgTxSize := 0 - - for _, block := range node.blockchain.Blocks { - if block.IsStateBlock() { - totalTxCount += int(block.State.NumTransactions) - totalBlockCount += int(block.State.NumBlocks) - } else { - byteBuffer := bytes.NewBuffer([]byte{}) - encoder := gob.NewEncoder(byteBuffer) - encoder.Encode(block) - avgBlockSizeInBytes += len(byteBuffer.Bytes()) - - txCount += len(block.Transactions) - blockCount++ - totalTxCount += len(block.TransactionIds) - totalBlockCount++ - - byteBuffer = bytes.NewBuffer([]byte{}) - encoder = gob.NewEncoder(byteBuffer) - encoder.Encode(block.Transactions) - avgTxSize += len(byteBuffer.Bytes()) + if node.Chain == nil { + node.log.Debug("Stopping Node", "node", node, "numBlocks", len(node.blockchain.Blocks), "numTxsProcessed", node.countNumTransactionsInBlockchain()) + + sizeInBytes := node.UtxoPool.GetSizeInByteOfUtxoMap() + node.log.Debug("UtxoPool Report", "numEntries", len(node.UtxoPool.UtxoMap), "sizeInBytes", sizeInBytes) + + avgBlockSizeInBytes := 0 + txCount := 0 + blockCount := 0 + totalTxCount := 0 + totalBlockCount := 0 + avgTxSize := 0 + + for _, block := range node.blockchain.Blocks { + if block.IsStateBlock() { + totalTxCount += int(block.State.NumTransactions) + totalBlockCount += int(block.State.NumBlocks) + } else { + byteBuffer := bytes.NewBuffer([]byte{}) + encoder := gob.NewEncoder(byteBuffer) + encoder.Encode(block) + avgBlockSizeInBytes += len(byteBuffer.Bytes()) + + txCount += len(block.Transactions) + blockCount++ + totalTxCount += len(block.TransactionIds) + totalBlockCount++ + + byteBuffer = bytes.NewBuffer([]byte{}) + encoder = gob.NewEncoder(byteBuffer) + encoder.Encode(block.Transactions) + avgTxSize += len(byteBuffer.Bytes()) + } + } + if blockCount != 0 { + avgBlockSizeInBytes = avgBlockSizeInBytes / blockCount + avgTxSize = avgTxSize / txCount } - } - if blockCount != 0 { - avgBlockSizeInBytes = avgBlockSizeInBytes / blockCount - avgTxSize = avgTxSize / txCount - } - node.log.Debug("Blockchain Report", "totalNumBlocks", totalBlockCount, "avgBlockSizeInCurrentEpoch", avgBlockSizeInBytes, "totalNumTxs", totalTxCount, "avgTxSzieInCurrentEpoch", avgTxSize) + node.log.Debug("Blockchain Report", "totalNumBlocks", totalBlockCount, "avgBlockSizeInCurrentEpoch", avgBlockSizeInBytes, "totalNumTxs", totalTxCount, "avgTxSzieInCurrentEpoch", avgTxSize) + } else { + node.log.Debug("Stopping Node (Account Model)", "node", node, "CurBlockNum", node.Chain.CurrentHeader().Number, "numTxsProcessed", node.countNumTransactionsInBlockchainAccount()) + } os.Exit(0) } @@ -260,13 +261,23 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) { switch txMessageType { case proto_node.Send: - txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the Send messge type - txList := new([]*blockchain.Transaction) - err := txDecoder.Decode(txList) - if err != nil { - node.log.Error("Failed to deserialize transaction list", "error", err) + if node.Chain != nil { + txs := types.Transactions{} + err := rlp.Decode(bytes.NewReader(msgPayload[1:]), &txs) // skip the Send messge type + if err != nil { + node.log.Error("Failed to deserialize transaction list", "error", err) + } + node.addPendingTransactionsAccount(txs) + } else { + txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the Send messge type + txList := new([]*blockchain.Transaction) + err := txDecoder.Decode(&txList) + if err != nil { + node.log.Error("Failed to deserialize transaction list", "error", err) + } + node.addPendingTransactions(*txList) } - node.addPendingTransactions(*txList) + case proto_node.Request: reader := bytes.NewBuffer(msgPayload[1:]) txIDs := make(map[[32]byte]bool) @@ -379,21 +390,31 @@ func (node *Node) WaitForConsensusReadyAccount(readySignal chan struct{}) { } if !retry { - // Normal tx block consensus - // TODO: add new block generation logic - txs := make([]*types.Transaction, 100) - for i, _ := range txs { - randomUserKey, _ := crypto.GenerateKey() - randomUserAddress := crypto.PubkeyToAddress(randomUserKey.PublicKey) - tx, _ := types.SignTx(types.NewTransaction(node.worker.GetCurrentState().GetNonce(crypto.PubkeyToAddress(node.testBankKey.PublicKey)), randomUserAddress, big.NewInt(1000), params.TxGas, nil, nil), types.HomesteadSigner{}, node.testBankKey) - txs[i] = tx + for { + if len(node.pendingTransactionsAccount) >= 1000 { + // Normal tx block consensus + selectedTxs, _ := node.getTransactionsForNewBlockAccount(MaxNumberOfTransactionsPerBlock) + err := node.Worker.UpdateCurrent() + if err != nil { + node.log.Debug("Failed updating worker's state", "Error", err) + } + err = node.Worker.CommitTransactions(selectedTxs, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey)) + if err == nil { + block, err := node.Worker.Commit() + if err != nil { + node.log.Debug("Failed commiting new block", "Error", err) + } else { + newBlock = block + break + } + } else { + node.log.Debug("Failed to create new block", "Error", err) + } + } + // If not enough transactions to run Consensus, + // periodically check whether we have enough transactions to package into block. + time.Sleep(1 * time.Second) } - node.worker.CommitTransactions(txs, crypto.PubkeyToAddress(node.testBankKey.PublicKey)) - newBlock = node.worker.Commit() - - // If not enough transactions to run Consensus, - // periodically check whether we have enough transactions to package into block. - time.Sleep(1 * time.Second) } // Send the new block to Consensus so it can be confirmed. @@ -446,11 +467,12 @@ func (node *Node) VerifyNewBlock(newBlock *blockchain.Block) bool { // VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on func (node *Node) VerifyNewBlockAccount(newBlock *types.Block) bool { - fmt.Println("VerifyingNNNNNNNNNNNNNN") - - fmt.Println("BALANCE 1") - fmt.Println(node.worker.GetCurrentState().GetBalance(crypto.PubkeyToAddress(node.testBankKey.PublicKey))) - return node.Chain.ValidateNewBlock(newBlock, crypto.PubkeyToAddress(node.testBankKey.PublicKey)) + err := node.Chain.ValidateNewBlock(newBlock, pki.GetAddressFromPublicKey(node.SelfPeer.PubKey)) + if err != nil { + node.log.Debug("Failed verifying new block", "Error", err, "tx", newBlock.Transactions()[0]) + return false + } + return true } // PostConsensusProcessing is called by consensus participants, after consensus is done, to: @@ -483,11 +505,26 @@ func (node *Node) PostConsensusProcessing(newBlock *blockchain.Block) { node.BroadcastNewBlock(newBlock) } + accountBlock := new(types.Block) + err := rlp.DecodeBytes(newBlock.AccountBlock, accountBlock) + if err != nil { + node.log.Error("Failed decoding the block with RLP") + } + node.AddNewBlock(newBlock) node.UpdateUtxoAndState(newBlock) + +} + +// AddNewBlockAccount is usedd to add new block into the blockchain. +func (node *Node) AddNewBlockAccount(newBlock *types.Block) { + num, err := node.Chain.InsertChain([]*types.Block{newBlock}) + if err != nil { + node.log.Debug("Error adding to chain", "numBlocks", num, "Error", err) + } } -// AddNewBlock is usedd to add new block into the blockchain. +// AddNewBlock is usedd to add new block into the utxo-based blockchain. func (node *Node) AddNewBlock(newBlock *blockchain.Block) { // Add it to blockchain node.blockchain.Blocks = append(node.blockchain.Blocks, newBlock) @@ -496,6 +533,14 @@ func (node *Node) AddNewBlock(newBlock *blockchain.Block) { node.log.Info("Writing new block into disk.") newBlock.Write(node.db, strconv.Itoa(len(node.blockchain.Blocks))) } + + // Account model + accountBlock := new(types.Block) + err := rlp.DecodeBytes(newBlock.AccountBlock, accountBlock) + if err != nil { + node.log.Error("Failed decoding the block with RLP") + } + node.AddNewBlockAccount(accountBlock) } // UpdateUtxoAndState updates Utxo and state. diff --git a/node/worker/worker.go b/node/worker/worker.go index 12ce4c71b..403408107 100644 --- a/node/worker/worker.go +++ b/node/worker/worker.go @@ -1,9 +1,6 @@ package worker import ( - "math/big" - "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/params" "github.com/harmony-one/harmony/consensus" @@ -11,6 +8,8 @@ import ( "github.com/harmony-one/harmony/core/state" "github.com/harmony-one/harmony/core/types" "github.com/harmony-one/harmony/core/vm" + "math/big" + "time" ) // environment is the worker's current environment and holds all of the current state information. @@ -50,14 +49,34 @@ func (w *Worker) commitTransaction(tx *types.Transaction, coinbase common.Addres return receipt.Logs, nil } -func (w *Worker) CommitTransactions(txs []*types.Transaction, coinbase common.Address) { +func (w *Worker) CommitTransactions(txs []*types.Transaction, coinbase common.Address) error { + snap := w.current.state.Snapshot() + if w.current.gasPool == nil { w.current.gasPool = new(core.GasPool).AddGas(w.current.header.GasLimit) } - for _, tx := range txs { - w.commitTransaction(tx, coinbase) + _, err := w.commitTransaction(tx, coinbase) + if err != nil { + w.current.state.RevertToSnapshot(snap) + return err + + } + } + return nil +} + +func (w *Worker) UpdateCurrent() error { + parent := w.chain.CurrentBlock() + num := parent.Number() + timestamp := time.Now().Unix() + header := &types.Header{ + ParentHash: parent.Hash(), + Number: num.Add(num, common.Big1), + GasLimit: core.CalcGasLimit(parent, w.gasFloor, w.gasCeil), + Time: big.NewInt(timestamp), } + return w.makeCurrent(parent, header) } // makeCurrent creates a new environment for the current cycle. @@ -79,13 +98,13 @@ func (w *Worker) GetCurrentState() *state.StateDB { return w.current.state } -func (w *Worker) Commit() *types.Block { +func (w *Worker) Commit() (*types.Block, error) { s := w.current.state.Copy() block, err := w.engine.Finalize(w.chain, w.current.header, s, w.current.txs, w.current.receipts) if err != nil { - return nil + return nil, err } - return block + return block, nil } func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus.Engine) *Worker { @@ -95,7 +114,7 @@ func New(config *params.ChainConfig, chain *core.BlockChain, engine consensus.En engine: engine, } worker.gasFloor = 0 - worker.gasCeil = 10000000 + worker.gasCeil = 1000000000000000 parent := worker.chain.CurrentBlock() num := parent.Number() diff --git a/proto/node/node.go b/proto/node/node.go index 5e2f86ced..00f36329a 100644 --- a/proto/node/node.go +++ b/proto/node/node.go @@ -3,6 +3,9 @@ package node import ( "bytes" "encoding/gob" + "fmt" + "github.com/ethereum/go-ethereum/rlp" + "github.com/harmony-one/harmony/core/types" "log" "github.com/harmony-one/harmony/blockchain" @@ -144,6 +147,21 @@ func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []b return byteBuffer.Bytes() } +// ConstructTransactionListMessageAccount constructs serialized transactions in account model +func ConstructTransactionListMessageAccount(transactions types.Transactions) []byte { + byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) + byteBuffer.WriteByte(byte(Transaction)) + byteBuffer.WriteByte(byte(Send)) + + txs, err := rlp.EncodeToBytes(transactions) + if err != nil { + fmt.Errorf("ERROR RLP %s", err) + return []byte{} // TODO(RJ): better handle of the error + } + byteBuffer.Write(txs) + return byteBuffer.Bytes() +} + // ConstructBlockchainSyncMessage constructs Blockchain Sync Message. func ConstructBlockchainSyncMessage(msgType BlockchainSyncMessageType, blockHash [32]byte) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.Node)}) diff --git a/trie/database.go b/trie/database.go index fe8df2797..1cf5301b6 100644 --- a/trie/database.go +++ b/trie/database.go @@ -18,17 +18,14 @@ package trie import ( "fmt" - "io" - "sync" - "time" - - hdb "github.com/harmony-one/harmony/db" - "github.com/simple-rules/harmony-benchmark/db" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" + hdb "github.com/harmony-one/harmony/db" + "io" + "sync" + "time" ) var ( @@ -674,7 +671,7 @@ func (db *Database) Commit(node common.Hash, report bool) error { } // commit is the private locked version of Commit. -func (db *Database) commit(hash common.Hash, batch db.Batch) error { +func (db *Database) commit(hash common.Hash, batch hdb.Batch) error { // If the node does not exist, it's a previously committed node node, ok := db.nodes[hash] if !ok {