Add logic to select transactions from pendingTxs using UTXO pool

pull/7/head
Rongjian Lan 7 years ago
parent f09332d5b5
commit d5214f3b3d
  1. 3
      aws-code/transaction_generator.go
  2. 11
      consensus/consensus_leader.go
  3. 5
      consensus/consensus_validator.go
  4. 15
      node/node.go
  5. 13
      node/node_handler.go

@ -64,12 +64,13 @@ func readConfigFile(configFile string) [][]string {
func main() { func main() {
configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config")
numTxsPerBatch := flag.Int("num_txs_per_batch", 100, "number of transactions to send per message")
flag.Parse() flag.Parse()
config := readConfigFile(*configFile) config := readConfigFile(*configFile)
start := time.Now() start := time.Now()
totalTime := 60.0 totalTime := 60.0
txs := make([]blockchain.Transaction, 10) txs := make([]blockchain.Transaction, *numTxsPerBatch)
leaders := getLeaders(&config) leaders := getLeaders(&config)
for true { for true {
t := time.Now() t := time.Now()

@ -39,7 +39,6 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) {
consensus.Log.Error("Failed to get consensus message payload.", "err", err, "consensus", consensus) consensus.Log.Error("Failed to get consensus message payload.", "err", err, "consensus", consensus)
} }
consensus.Log.Debug("Received and processing message", "sharedId", consensus.ShardId, "msgType", msgType, "consensus", consensus)
switch msgType { switch msgType {
case ANNOUNCE: case ANNOUNCE:
consensus.Log.Error("Unexpected message type", "msgType", msgType, "consensus", consensus) consensus.Log.Error("Unexpected message type", "msgType", msgType, "consensus", consensus)
@ -165,7 +164,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
shouldProcess := !ok && consensus.state == ANNOUNCE_DONE shouldProcess := !ok && consensus.state == ANNOUNCE_DONE
if shouldProcess { if shouldProcess {
consensus.commits[validatorId] = validatorId consensus.commits[validatorId] = validatorId
consensus.Log.Debug("Commits received", "numOfCommits", len(consensus.commits), "consensus", consensus) //consensus.Log.Debug("Number of commits received", "count", len(consensus.commits))
} }
mutex.Unlock() mutex.Unlock()
@ -175,7 +174,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
mutex.Lock() mutex.Lock()
if len(consensus.commits) >= (2*len(consensus.validators))/3+1 { if len(consensus.commits) >= (2*len(consensus.validators))/3+1 {
consensus.Log.Debug("Enough commits received with signatures", "numOfSignatures", len(consensus.commits), "consensus", consensus) consensus.Log.Debug("Enough commits received with signatures", "numOfSignatures", len(consensus.commits))
if consensus.state == ANNOUNCE_DONE { if consensus.state == ANNOUNCE_DONE {
// Set state to CHALLENGE_DONE // Set state to CHALLENGE_DONE
consensus.state = CHALLENGE_DONE consensus.state = CHALLENGE_DONE
@ -282,7 +281,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
shouldProcess := !ok && consensus.state == CHALLENGE_DONE shouldProcess := !ok && consensus.state == CHALLENGE_DONE
if shouldProcess { if shouldProcess {
consensus.responses[validatorId] = validatorId consensus.responses[validatorId] = validatorId
consensus.Log.Debug("Number of responses received", "count", len(consensus.responses), "consensus", consensus) //consensus.Log.Debug("Number of responses received", "count", len(consensus.responses))
} }
mutex.Unlock() mutex.Unlock()
@ -292,13 +291,13 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
mutex.Lock() mutex.Lock()
if len(consensus.responses) >= (2*len(consensus.validators))/3+1 { if len(consensus.responses) >= (2*len(consensus.validators))/3+1 {
consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(consensus.responses), "consensus", consensus) consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(consensus.responses))
if consensus.state == CHALLENGE_DONE { if consensus.state == CHALLENGE_DONE {
// Set state to FINISHED // Set state to FINISHED
consensus.state = FINISHED consensus.state = FINISHED
// TODO: do followups on the consensus // TODO: do followups on the consensus
consensus.Log.Debug("HOORAY!!! CONSENSUS REACHED!!!", "numOfNodes", len(consensus.validators), "consensus", consensus) consensus.Log.Debug("HOORAY!!! CONSENSUS REACHED!!!", "numOfNodes", len(consensus.validators))
consensus.ResetState() consensus.ResetState()
consensus.consensusId++ consensus.consensusId++

@ -18,7 +18,6 @@ func (consensus *Consensus) ProcessMessageValidator(message []byte) {
consensus.Log.Error("Failed to get consensus message payload", "err", err, "consensus", consensus) consensus.Log.Error("Failed to get consensus message payload", "err", err, "consensus", consensus)
} }
consensus.Log.Info("Received and processing message", "msgType", msgType, "consensus", consensus, "consensus", consensus)
switch msgType { switch msgType {
case ANNOUNCE: case ANNOUNCE:
consensus.processAnnounceMessage(payload) consensus.processAnnounceMessage(payload)
@ -73,7 +72,7 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) {
copy(blockHash[:32], consensus.blockHash[:]) copy(blockHash[:32], consensus.blockHash[:])
// verify block data // verify block data
if consensusId != consensus.consensusId { if consensusId != consensus.consensusId {
consensus.Log.Debug("Received message", "fromConsensus", consensus, "myConsensus", consensus) consensus.Log.Debug("Received message", "fromConsensus", consensus)
return return
} }
// sign block // sign block
@ -163,7 +162,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
// verify block data and the aggregated signatures // verify block data and the aggregated signatures
if consensusId != consensus.consensusId { if consensusId != consensus.consensusId {
consensus.Log.Debug("Received message", "fromConsensus", consensusId, "consensus", consensus) consensus.Log.Debug("Received message", "fromConsensus", consensusId)
return return
} }

@ -58,7 +58,6 @@ func (node *Node) listenOnPort(port string) {
} }
} }
func (node *Node) String() string { func (node *Node) String() string {
return node.consensus.String() return node.consensus.String()
} }
@ -66,14 +65,22 @@ func (node *Node) String() string {
// Create a new Node // Create a new Node
func NewNode(consensus *consensus.Consensus) Node { func NewNode(consensus *consensus.Consensus) Node {
node := Node{} node := Node{}
// Consensus and associated channel to communicate blocks
node.consensus = consensus node.consensus = consensus
node.BlockChannel = make(chan blockchain.Block) node.BlockChannel = make(chan blockchain.Block)
// Genesis Block
genesisBlock := &blockchain.Blockchain{}
genesisBlock.Blocks = make([]*blockchain.Block, 0)
coinbaseTx := blockchain.NewCoinbaseTX("harmony", "1") coinbaseTx := blockchain.NewCoinbaseTX("harmony", "1")
node.blockchain = &blockchain.Blockchain{} genesisBlock.Blocks = append(genesisBlock.Blocks, blockchain.NewGenesisBlock(coinbaseTx))
node.blockchain.Blocks = make([]*blockchain.Block, 0) node.blockchain = genesisBlock
node.blockchain.Blocks = append(node.blockchain.Blocks, blockchain.NewGenesisBlock(coinbaseTx))
// UTXO pool from Genesis block
node.utxoPool = blockchain.CreateUTXOPoolFromGenesisBlockChain(node.blockchain) node.utxoPool = blockchain.CreateUTXOPoolFromGenesisBlockChain(node.blockchain)
// Logger
node.log = node.consensus.Log node.log = node.consensus.Log
node.log.Debug("New node", "node", node) node.log.Debug("New node", "node", node)
return node return node

@ -123,11 +123,16 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) {
// create a new block // create a new block
newBlock := new(blockchain.Block) newBlock := new(blockchain.Block)
for { for {
if len(node.pendingTransactions) >= 10 { if len(node.pendingTransactions) >= 100 {
node.log.Debug("Creating new block", "node", node)
selectedTxs := node.getTransactionsForNewBlock() selectedTxs := node.getTransactionsForNewBlock()
newBlock = blockchain.NewBlock(selectedTxs, []byte{}) if len(selectedTxs) == 0 {
break node.log.Debug("No transactions is selected for consensus", "pendingTx", len(node.pendingTransactions))
} else {
node.log.Debug("Creating new block", "node", node)
newBlock = blockchain.NewBlock(selectedTxs, []byte{})
node.log.Debug("Num of Pending Tx", "count", len(node.pendingTransactions))
break
}
} }
time.Sleep(1 * time.Second) // Periodically check whether we have enough transactions to package into block. time.Sleep(1 * time.Second) // Periodically check whether we have enough transactions to package into block.
} }

Loading…
Cancel
Save