integrate account model into consensus without the block verification part

pull/86/head
Rongjian Lan 6 years ago
parent a9b47b07f7
commit 9fbfee9907
  1. 31
      benchmark.go
  2. 3
      blockchain/block.go
  3. 3
      client/txgen/main.go
  4. 2
      consensus/bft.go
  5. 17
      consensus/consensus_leader.go
  6. 9
      consensus/consensus_validator.go
  7. 9
      node/node_handler.go
  8. BIN
      transactions.rlp

@ -74,6 +74,7 @@ func loggingInit(logFolder, role, ip, port string, onlyLogTps bool) {
} }
func main() { func main() {
accountModel := flag.Bool("account_model", false, "Whether to use account model")
// TODO: use http://getmyipaddress.org/ or http://www.get-myip.com/ to retrieve my IP address // TODO: use http://getmyipaddress.org/ or http://www.get-myip.com/ to retrieve my IP address
ip := flag.String("ip", "127.0.0.1", "IP of the node") ip := flag.String("ip", "127.0.0.1", "IP of the node")
port := flag.String("port", "9000", "port of the node.") port := flag.String("port", "9000", "port of the node.")
@ -190,20 +191,34 @@ func main() {
// Assign closure functions to the consensus object // Assign closure functions to the consensus object
consensus.BlockVerifier = currentNode.VerifyNewBlock consensus.BlockVerifier = currentNode.VerifyNewBlock
if *accountModel {
consensus.BlockVerifier = currentNode.VerifyNewBlockAccount
}
consensus.OnConsensusDone = currentNode.PostConsensusProcessing consensus.OnConsensusDone = currentNode.PostConsensusProcessing
// Temporary testing code, to be removed. // Temporary testing code, to be removed.
currentNode.AddTestingAddresses(10000) currentNode.AddTestingAddresses(10000)
if consensus.IsLeader { if consensus.IsLeader {
// Let consensus run if *accountModel {
go func() { // Let consensus run
consensus.WaitForNewBlock(currentNode.BlockChannel) go func() {
}() consensus.WaitForNewBlockAccount(currentNode.BlockChannelAccount)
// Node waiting for consensus readiness to create new block }()
go func() { // Node waiting for consensus readiness to create new block
currentNode.WaitForConsensusReady(consensus.ReadySignal) go func() {
}() currentNode.WaitForConsensusReadyAccount(consensus.ReadySignal)
}()
} else {
// Let consensus run
go func() {
consensus.WaitForNewBlock(currentNode.BlockChannel)
}()
// Node waiting for consensus readiness to create new block
go func() {
currentNode.WaitForConsensusReady(consensus.ReadySignal)
}()
}
} }
currentNode.StartServer(*port) currentNode.StartServer(*port)

@ -5,6 +5,7 @@ import (
"crypto/sha256" "crypto/sha256"
"encoding/gob" "encoding/gob"
"fmt" "fmt"
"github.com/harmony-one/harmony/core/types"
"log" "log"
"time" "time"
@ -27,6 +28,8 @@ type Block struct {
// Signature... // Signature...
Bitmap []byte // Contains which validator signed the block. Bitmap []byte // Contains which validator signed the block.
Signature [66]byte // Schnorr collective signature. Signature [66]byte // Schnorr collective signature.
AccountBlock *types.Block // Temporary piggy-back.
} }
// State is used in Block to indicate that block is a state block. // State is used in Block to indicate that block is a state block.

@ -252,7 +252,7 @@ func main() {
maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 20000, "number of transactions to send per message") maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 20000, "number of transactions to send per message")
logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution") logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
numSubset := flag.Int("numSubset", 3, "the number of subsets of utxos to process separately") numSubset := flag.Int("numSubset", 3, "the number of subsets of utxos to process separately")
duration := flag.Int("duration", 120, "duration of the tx generation in second. If it's negative, the experiment runs forever.") duration := flag.Int("duration", 60, "duration of the tx generation in second. If it's negative, the experiment runs forever.")
versionFlag := flag.Bool("version", false, "Output version info") versionFlag := flag.Bool("version", false, "Output version info")
crossShardRatio := flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.") crossShardRatio := flag.Int("cross_shard_ratio", 30, "The percentage of cross shard transactions.")
flag.Parse() flag.Parse()
@ -392,6 +392,7 @@ func main() {
msg := proto_node.ConstructStopMessage() msg := proto_node.ConstructStopMessage()
peers := append(config.GetValidators(), clientNode.Client.GetLeaders()...) peers := append(config.GetValidators(), clientNode.Client.GetLeaders()...)
p2p.BroadcastMessage(peers, msg) p2p.BroadcastMessage(peers, msg)
time.Sleep(3000 * time.Millisecond)
} }
func SendTxsToLeader(leader p2p.Peer, txs []*blockchain.Transaction) { func SendTxsToLeader(leader p2p.Peer, txs []*blockchain.Transaction) {

@ -80,7 +80,7 @@ func (bft *Bft) Finalize(chain ChainReader, header *types.Header, state *state.S
// Accumulate any block and uncle rewards and commit the final state root // Accumulate any block and uncle rewards and commit the final state root
// Header seems complete, assemble into a block and return // Header seems complete, assemble into a block and return
accumulateRewards(chain.Config(), state, header) accumulateRewards(chain.Config(), state, header)
header.Root = state.IntermediateRoot(chain.Config().IsEIP158(header.Number)) header.Root = state.IntermediateRoot(false)
return types.NewBlock(header, txs, receipts), nil return types.NewBlock(header, txs, receipts), nil
} }

@ -6,6 +6,7 @@ import (
"encoding/gob" "encoding/gob"
"encoding/hex" "encoding/hex"
"errors" "errors"
"github.com/harmony-one/harmony/core/types"
"time" "time"
"github.com/harmony-one/harmony/profiler" "github.com/harmony-one/harmony/profiler"
@ -39,6 +40,22 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block)
} }
} }
// WaitForNewBlock waits for the next new block to run consensus on
func (consensus *Consensus) WaitForNewBlockAccount(blockChannel chan *types.Block) {
consensus.Log.Debug("Waiting for block", "consensus", consensus)
for { // keep waiting for new blocks
newBlock := <-blockChannel
// TODO: think about potential race condition
startTime = time.Now()
consensus.Log.Debug("STARTING CONSENSUS", "consensus", consensus, "startTime", startTime)
for consensus.state == Finished {
// time.Sleep(500 * time.Millisecond)
consensus.startConsensus(&blockchain.Block{Hash: newBlock.Hash(), AccountBlock: newBlock})
break
}
}
}
// ProcessMessageLeader dispatches consensus message for the leader. // ProcessMessageLeader dispatches consensus message for the leader.
func (consensus *Consensus) ProcessMessageLeader(message []byte) { func (consensus *Consensus) ProcessMessageLeader(message []byte) {
msgType, err := proto_consensus.GetConsensusMessageType(message) msgType, err := proto_consensus.GetConsensusMessageType(message)

@ -109,11 +109,12 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) {
return return
} }
// Temporary disabling this for account model testing
// check block hash // check block hash
if !bytes.Equal(blockHash[:], blockHeaderObj.CalculateBlockHash()[:]) || !bytes.Equal(blockHeaderObj.Hash[:], blockHeaderObj.CalculateBlockHash()[:]) { //if !bytes.Equal(blockHash[:], blockHeaderObj.CalculateBlockHash()[:]) || !bytes.Equal(blockHeaderObj.Hash[:], blockHeaderObj.CalculateBlockHash()[:]) {
consensus.Log.Warn("Block hash doesn't match", "consensus", consensus) // consensus.Log.Warn("Block hash doesn't match", "consensus", consensus)
return // return
} //}
// check block data (transactions // check block data (transactions
if !consensus.BlockVerifier(&blockHeaderObj) { if !consensus.BlockVerifier(&blockHeaderObj) {

@ -389,6 +389,10 @@ func (node *Node) WaitForConsensusReadyAccount(readySignal chan struct{}) {
} }
node.worker.CommitTransactions(txs, crypto.PubkeyToAddress(node.testBankKey.PublicKey)) node.worker.CommitTransactions(txs, crypto.PubkeyToAddress(node.testBankKey.PublicKey))
newBlock = node.worker.Commit() newBlock = node.worker.Commit()
// If not enough transactions to run Consensus,
// periodically check whether we have enough transactions to package into block.
time.Sleep(1 * time.Second)
} }
// Send the new block to Consensus so it can be confirmed. // Send the new block to Consensus so it can be confirmed.
@ -431,6 +435,11 @@ func (node *Node) VerifyNewBlock(newBlock *blockchain.Block) bool {
return node.UtxoPool.VerifyTransactions(newBlock.Transactions) return node.UtxoPool.VerifyTransactions(newBlock.Transactions)
} }
// VerifyNewBlock is called by consensus participants to verify the block (account model) they are running consensus on
func (node *Node) VerifyNewBlockAccount(newBlock *blockchain.Block) bool {
return true // TODO: implement the logic
}
// PostConsensusProcessing is called by consensus participants, after consensus is done, to: // PostConsensusProcessing is called by consensus participants, after consensus is done, to:
// 1. add the new block to blockchain // 1. add the new block to blockchain
// 2. [leader] move cross shard tx and proof to the list where they wait to be sent to the client // 2. [leader] move cross shard tx and proof to the list where they wait to be sent to the client

Binary file not shown.
Loading…
Cancel
Save