diff --git a/node/node.go b/node/node.go index ac0732c6e..f7d0a92fb 100644 --- a/node/node.go +++ b/node/node.go @@ -6,16 +6,34 @@ import ( "harmony-benchmark/log" "net" "os" + "sync" ) +var pendingTxMutex = &sync.Mutex{} + // A node represents a program (machine) participating in the network type Node struct { - consensus *consensus.Consensus - BlockChannel chan blockchain.Block - pendingTransactions []blockchain.Transaction - blockchain *blockchain.Blockchain - utxoPool *blockchain.UTXOPool - log log.Logger + consensus *consensus.Consensus + BlockChannel chan blockchain.Block + pendingTransactions []*blockchain.Transaction + transactionInConsensus []*blockchain.Transaction + blockchain *blockchain.Blockchain + utxoPool *blockchain.UTXOPool + log log.Logger +} + +func (node *Node) addPendingTransactions(newTxs []*blockchain.Transaction) { + pendingTxMutex.Lock() + node.pendingTransactions = append(node.pendingTransactions, newTxs...) + pendingTxMutex.Unlock() +} + +func (node *Node) getTransactionsForNewBlock() []*blockchain.Transaction { + pendingTxMutex.Lock() + selected, unselected := node.utxoPool.SelectTransactionsForNewBlock(node.pendingTransactions) + node.pendingTransactions = unselected + pendingTxMutex.Unlock() + return selected } // Start a server and process the request by a handler. diff --git a/node/node_handler.go b/node/node_handler.go index afd0b7cdb..ee356effc 100644 --- a/node/node_handler.go +++ b/node/node_handler.go @@ -76,12 +76,12 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) { case SEND: txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type - txList := new([]blockchain.Transaction) + txList := new([]*blockchain.Transaction) err := txDecoder.Decode(&txList) if err != nil { node.log.Error("Failed deserializing transaction list", "node", node) } - node.pendingTransactions = append(node.pendingTransactions, *txList...) + node.addPendingTransactions(*txList) case REQUEST: reader := bytes.NewBuffer(msgPayload[1:]) var txIds map[[32]byte]bool @@ -95,7 +95,7 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) { txIds[getFixedByteTxId(txId)] = true } - var txToReturn []blockchain.Transaction + var txToReturn []*blockchain.Transaction for _, tx := range node.pendingTransactions { if txIds[getFixedByteTxId(tx.ID)] { txToReturn = append(txToReturn, tx) @@ -125,14 +125,8 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) { for { if len(node.pendingTransactions) >= 10 { node.log.Debug("Creating new block", "node", node) - // TODO (Minh): package actual transactions - // For now, just take out 10 transactions - var txList []*blockchain.Transaction - for _, tx := range node.pendingTransactions[0:10] { - txList = append(txList, &tx) - } - node.pendingTransactions = node.pendingTransactions[10:] - newBlock = blockchain.NewBlock(txList, []byte{}) + selectedTxs := node.getTransactionsForNewBlock() + newBlock = blockchain.NewBlock(selectedTxs, []byte{}) break } time.Sleep(1 * time.Second) // Periodically check whether we have enough transactions to package into block.