fix conflict

pull/8/head
Minh Doan 6 years ago
commit cf0ce3ba85
  1. 2
      aws-code/transaction_generator.go
  2. 9
      benchmark_main.go
  3. 25
      blockchain/block.go
  4. 6
      blockchain/blockchain.go
  5. 8
      blockchain/transaction.go
  6. 8
      blockchain/utxopool.go
  7. 6
      consensus/consensus.go
  8. 51
      consensus/consensus_leader.go
  9. 11
      consensus/consensus_leader_test.go
  10. 92
      consensus/consensus_validator.go
  11. 4
      consensus/consensus_validator_test.go
  12. 40
      node/node_handler.go

@ -53,7 +53,7 @@ func getNewFakeTransactions(dataNode *node.Node, numTxs int) []*blockchain.Trans
// Spend the money of current UTXO to a random address in [1 - 1000]
txin := blockchain.TXInput{txId, index, address}
txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(1000))}
tx := blockchain.Transaction{nil, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}}
tx := blockchain.Transaction{[32]byte{}, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}}
tx.SetID()
if count >= numTxs {

@ -76,6 +76,10 @@ func main() {
consensus := consensus.NewConsensus(*ip, *port, shardId, peers, leader)
node := node.NewNode(&consensus)
// Assign closure functions to the consensus object
consensus.BlockVerifier = node.VerifyNewBlock
consensus.OnConsensusDone = node.AddNewBlockToBlockchain
// Temporary testing code, to be removed.
node.AddMoreFakeTransactions()
@ -88,6 +92,11 @@ func main() {
go func() {
node.WaitForConsensusReady(consensus.ReadySignal)
}()
} else {
// Node waiting to add new block to the blockchain
go func() {
node.WaitForConsensusReady(consensus.ReadySignal)
}()
}
node.StartServer(*port)

@ -9,12 +9,19 @@ import (
"time"
)
// Block keeps block headers.
// Block keeps block headers, transactions and signature.
type Block struct {
Timestamp int64
// Header
Timestamp int64
PrevBlockHash [32]byte
Hash [32]byte
NumTransactions int32
TransactionIds [][32]byte
// Transactions
Transactions []*Transaction
PrevBlockHash [32]byte
Hash [32]byte
// Signature...
}
// Serialize serializes the block
@ -56,7 +63,7 @@ func (b *Block) HashTransactions() []byte {
var txHash [32]byte
for _, tx := range b.Transactions {
txHashes = append(txHashes, tx.ID)
txHashes = append(txHashes, tx.ID[:])
}
txHash = sha256.Sum256(bytes.Join(txHashes, []byte{}))
return txHash[:]
@ -64,7 +71,13 @@ func (b *Block) HashTransactions() []byte {
// NewBlock creates and returns a neew block.
func NewBlock(transactions []*Transaction, prevBlockHash [32]byte) *Block {
block := &Block{time.Now().Unix(), transactions, prevBlockHash, [32]byte{}}
numTxs := int32(len(transactions))
var txIds [][32]byte
for _, tx := range transactions {
txIds = append(txIds, tx.ID)
}
block := &Block{time.Now().Unix(), prevBlockHash, [32]byte{},numTxs, txIds,transactions}
copy(block.Hash[:], block.HashTransactions()[:]) // TODO(Minh): the blockhash should be a hash of everything in the block
return block

@ -29,7 +29,7 @@ func (bc *Blockchain) FindUnspentTransactions(address string) []Transaction {
block := bc.Blocks[index]
for _, tx := range block.Transactions {
txID := hex.EncodeToString(tx.ID)
txID := hex.EncodeToString(tx.ID[:])
idx := -1
// TODO(minhdoan): Optimize this.
@ -85,7 +85,7 @@ func (bc *Blockchain) FindSpendableOutputs(address string, amount int) (int, map
Work:
for _, tx := range unspentTXs {
txID := hex.EncodeToString(tx.ID)
txID := hex.EncodeToString(tx.ID[:])
for outIdx, txOutput := range tx.TxOutput {
if txOutput.Address == address && accumulated < amount {
@ -132,7 +132,7 @@ func (bc *Blockchain) NewUTXOTransaction(from, to string, amount int) *Transacti
outputs = append(outputs, TXOutput{acc - amount, from}) // a change
}
tx := Transaction{nil, inputs, outputs}
tx := Transaction{[32]byte{}, inputs, outputs}
tx.SetID()
return &tx

@ -11,7 +11,7 @@ import (
// Transaction represents a Bitcoin transaction
type Transaction struct {
ID []byte // 32 byte hash
ID [32]byte // 32 byte hash
TxInput []TXInput
TxOutput []TXOutput
}
@ -43,7 +43,7 @@ func (tx *Transaction) SetID() {
log.Panic(err)
}
hash = sha256.Sum256(encoded.Bytes())
tx.ID = hash[:]
tx.ID = hash
}
// NewCoinbaseTX creates a new coinbase transaction
@ -54,7 +54,7 @@ func NewCoinbaseTX(to, data string) *Transaction {
txin := TXInput{[]byte{}, -1, data}
txout := TXOutput{DefaultCoinbaseValue, to}
tx := Transaction{nil, []TXInput{txin}, []TXOutput{txout}}
tx := Transaction{[32]byte{}, []TXInput{txin}, []TXOutput{txout}}
tx.SetID()
return &tx
}
@ -76,7 +76,7 @@ func (txOutput *TXOutput) String() string {
// Used for debuging.
func (tx *Transaction) String() string {
res := fmt.Sprintf("ID: %v\n", hex.EncodeToString(tx.ID))
res := fmt.Sprintf("ID: %v\n", hex.EncodeToString(tx.ID[:]))
res += fmt.Sprintf("TxInput:\n")
for id, value := range tx.TxInput {
res += fmt.Sprintf("%v: %v\n", id, value.String())

@ -79,7 +79,7 @@ func (utxoPool *UTXOPool) VerifyTransactions(transactions []*Transaction) bool {
// VerifyOneTransaction verifies if a list of transactions valid.
func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction) bool {
spentTXOs := make(map[string]map[string]map[int]bool)
txID := hex.EncodeToString(tx.ID)
txID := hex.EncodeToString(tx.ID[:])
inTotal := 0
// Calculate the sum of TxInput
for _, in := range tx.TxInput {
@ -121,7 +121,7 @@ func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction) bool {
// UpdateOneTransaction updates utxoPool in respect to the new Transaction.
func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
if utxoPool != nil {
txID := hex.EncodeToString(tx.ID)
txID := hex.EncodeToString(tx.ID[:])
// Remove
for _, in := range tx.TxInput {
@ -166,7 +166,7 @@ func (utxoPool *UTXOPool) VerifyAndUpdate(transactions []*Transaction) bool {
func (utxoPool *UTXOPool) Update(transactions []*Transaction) {
if utxoPool != nil {
for _, tx := range transactions {
curTxID := hex.EncodeToString(tx.ID)
curTxID := hex.EncodeToString(tx.ID[:])
// Remove
for _, in := range tx.TxInput {
@ -192,7 +192,7 @@ func (utxoPool *UTXOPool) Update(transactions []*Transaction) {
// CreateUTXOPoolFromTransaction a Utxo pool from a genesis transaction.
func CreateUTXOPoolFromTransaction(tx *Transaction) *UTXOPool {
var utxoPool UTXOPool
txID := hex.EncodeToString(tx.ID)
txID := hex.EncodeToString(tx.ID[:])
utxoPool.UtxoMap = make(map[string]map[string]map[int]int)
for index, out := range tx.TxOutput {
utxoPool.UtxoMap[out.Address] = make(map[string]map[int]int)

@ -8,6 +8,7 @@ import (
"harmony-benchmark/p2p"
"regexp"
"strconv"
"harmony-benchmark/blockchain"
)
// Consensus data containing all info related to one consensus process
@ -40,6 +41,11 @@ type Consensus struct {
// Signal channel for starting a new consensus process
ReadySignal chan int
// The verifier func passed from Node object
BlockVerifier func(*blockchain.Block)bool
// The post-consensus processing func passed from Node object
// Called when consensus on a new block is done
OnConsensusDone func(*blockchain.Block)
//// Network related fields
msgCategory byte

@ -6,11 +6,10 @@ import (
"bytes"
"crypto/sha256"
"encoding/binary"
"errors"
"fmt"
"harmony-benchmark/blockchain"
"harmony-benchmark/p2p"
"strings"
"encoding/gob"
)
var mutex = &sync.Mutex{}
@ -63,21 +62,23 @@ func (consensus *Consensus) processStartConsensusMessage(payload []byte) {
func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) {
// prepare message and broadcast to validators
// Construct new block
//newBlock := constructNewBlock()
copy(newBlock.Hash[:32], consensus.blockHash[:])
msgToSend, err := consensus.constructAnnounceMessage()
if err != nil {
return
}
// Copy over block hash and block header data
copy(consensus.blockHash[:], newBlock.Hash[:])
byteBuffer := bytes.NewBuffer([]byte{})
encoder := gob.NewEncoder(byteBuffer)
encoder.Encode(newBlock)
consensus.blockHeader = byteBuffer.Bytes()
msgToSend := consensus.constructAnnounceMessage()
// Set state to ANNOUNCE_DONE
consensus.state = ANNOUNCE_DONE
p2p.BroadcastMessage(consensus.validators, msgToSend)
}
// Construct the announce message to send to validators
func (consensus Consensus) constructAnnounceMessage() ([]byte, error) {
func (consensus Consensus) constructAnnounceMessage() []byte {
buffer := bytes.NewBuffer([]byte{})
// 4 byte consensus id
@ -86,9 +87,6 @@ func (consensus Consensus) constructAnnounceMessage() ([]byte, error) {
buffer.Write(fourBytes)
// 32 byte block hash
if len(consensus.blockHash) != 32 {
return buffer.Bytes(), errors.New(fmt.Sprintf("Block Hash size is %d bytes", len(consensus.blockHash)))
}
buffer.Write(consensus.blockHash[:])
// 2 byte leader id
@ -97,11 +95,10 @@ func (consensus Consensus) constructAnnounceMessage() ([]byte, error) {
buffer.Write(twoBytes)
// n byte of block header
blockHeader := getBlockHeader()
buffer.Write(blockHeader)
buffer.Write(consensus.blockHeader)
// 4 byte of payload size
sizeOfPayload := uint32(len(blockHeader))
sizeOfPayload := uint32(len(consensus.blockHeader))
binary.BigEndian.PutUint32(fourBytes, sizeOfPayload)
buffer.Write(fourBytes)
@ -109,17 +106,7 @@ func (consensus Consensus) constructAnnounceMessage() ([]byte, error) {
signature := signMessage(buffer.Bytes())
buffer.Write(signature)
return consensus.ConstructConsensusMessage(ANNOUNCE, buffer.Bytes()), nil
}
// Get the hash of a block's byte stream
func getBlockHash(block []byte) [32]byte {
return sha256.Sum256(block)
}
// TODO: fill in this function
func getBlockHeader() []byte {
return make([]byte, 200)
return consensus.ConstructConsensusMessage(ANNOUNCE, buffer.Bytes())
}
func signMessage(message []byte) []byte {
@ -300,6 +287,16 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
consensus.Log.Debug("HOORAY!!! CONSENSUS REACHED!!!", "numOfNodes", len(consensus.validators))
consensus.ResetState()
// TODO: reconstruct the whole block from header and transactions
// For now, we used the stored whole block in consensus.blockHeader
txDecoder := gob.NewDecoder(bytes.NewReader(consensus.blockHeader))
var blockHeaderObj blockchain.Block
err := txDecoder.Decode(&blockHeaderObj)
if err != nil {
consensus.Log.Debug("failed to construct the new block after consensus")
}
consensus.OnConsensusDone(&blockHeaderObj)
consensus.consensusId++
// Send signal to Node so the new block can be added and new round of consensus can be triggered

@ -6,16 +6,13 @@ import (
)
func TestConstructAnnounceMessage(test *testing.T) {
header := getBlockHeader()
leader := p2p.Peer{Ip: "1", Port: "2"}
validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = getBlockHash(make([]byte, 10))
msg, err := consensus.constructAnnounceMessage()
consensus.blockHash = [32]byte{}
header := consensus.blockHeader
msg := consensus.constructAnnounceMessage()
if err != nil {
test.Error("Annouce message is not constructed successfully")
}
if len(msg) != 1+1+1+4+32+2+4+64+len(header) {
test.Errorf("Annouce message is not constructed in the correct size: %d", len(msg))
}
@ -25,7 +22,7 @@ func TestConstructChallengeMessage(test *testing.T) {
leader := p2p.Peer{Ip: "1", Port: "2"}
validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = getBlockHash(make([]byte, 10))
consensus.blockHash = [32]byte{}
msg := consensus.constructChallengeMessage()
if len(msg) != 1+1+1+4+32+2+33+33+32+64 {

@ -4,6 +4,10 @@ import (
"bytes"
"encoding/binary"
"harmony-benchmark/p2p"
"strconv"
"regexp"
"encoding/gob"
"harmony-benchmark/blockchain"
)
// Validator's consensus message dispatcher
@ -43,8 +47,8 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) {
blockHash := payload[offset : offset+32]
offset += 32
// 2 byte validator id
leaderId := string(payload[offset : offset+2])
// 2 byte leader id
leaderId := binary.BigEndian.Uint16(payload[offset : offset+2])
offset += 2
// n byte of block header
@ -63,19 +67,52 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) {
// TODO: make use of the data. This is just to avoid the unused variable warning
_ = consensusId
_ = blockHash
_ = leaderId
_ = blockHeader
_ = blockHeaderSize
_ = signature
copy(blockHash[:32], consensus.blockHash[:])
// verify block data
copy(consensus.blockHash[:], blockHash[:])
// Verify block data
// check consensus Id
if consensusId != consensus.consensusId {
consensus.Log.Debug("Received message", "fromConsensus", consensus)
consensus.Log.Debug("[ERROR] Received message with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId)
return
}
// check leader Id
leaderPrivKey := consensus.leader.Ip + consensus.leader.Port
reg, _ := regexp.Compile("[^0-9]+")
socketId := reg.ReplaceAllString(leaderPrivKey, "")
value, _ := strconv.Atoi(socketId)
if leaderId != uint16(value) {
consensus.Log.Debug("[ERROR] Received message from wrong leader", "myLeaderId", consensus.consensusId, "receivedLeaderId", consensusId)
return
}
// check block header is valid
txDecoder := gob.NewDecoder(bytes.NewReader(blockHeader))
var blockHeaderObj blockchain.Block // TODO: separate header from block. Right now, this blockHeader data is actually the whole block
err := txDecoder.Decode(&blockHeaderObj)
if err != nil {
consensus.Log.Debug("[ERROR] Unparseable block header data")
return
}
consensus.blockHeader = blockHeader
// check block hash
if bytes.Compare(blockHash[:], blockHeaderObj.HashTransactions()[:]) != 0 || bytes.Compare(blockHeaderObj.Hash[:], blockHeaderObj.HashTransactions()[:]) != 0 {
consensus.Log.Debug("[ERROR] Block hash doesn't match")
return
}
// check block data (transactions
if !consensus.BlockVerifier(&blockHeaderObj) {
consensus.Log.Debug("[ERROR] Block content is not verified successfully")
return
}
// sign block
// TODO: return the signature(commit) to leader
// For now, simply return the private key of this node.
@ -114,8 +151,8 @@ func (consensus Consensus) constructCommitMessage() []byte {
return consensus.ConstructConsensusMessage(COMMIT, buffer.Bytes())
}
// TODO: fill in this function
func getCommitMessage() []byte {
// TODO: use real cosi signature
return make([]byte, 33)
}
@ -131,7 +168,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
offset += 32
// 2 byte leader id
leaderId := string(payload[offset : offset+2])
leaderId := binary.BigEndian.Uint16(payload[offset : offset+2])
offset += 2
// 33 byte of aggregated commit
@ -160,13 +197,30 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
_ = challenge
_ = signature
// verify block data and the aggregated signatures
// erify block data and the aggregated signatures
// check consensus Id
if consensusId != consensus.consensusId {
consensus.Log.Debug("Received message", "fromConsensus", consensusId)
consensus.Log.Debug("[ERROR] Received message with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId)
return
}
// sign the message
// check leader Id
leaderPrivKey := consensus.leader.Ip + consensus.leader.Port
reg, _ := regexp.Compile("[^0-9]+")
socketId := reg.ReplaceAllString(leaderPrivKey, "")
value, _ := strconv.Atoi(socketId)
if leaderId != uint16(value) {
consensus.Log.Debug("[ERROR] Received message from wrong leader", "myLeaderId", consensus.consensusId, "receivedLeaderId", consensusId)
return
}
// check block hash
if bytes.Compare(blockHash[:], consensus.blockHash[:]) != 0 {
consensus.Log.Debug("[ERROR] Block hash doesn't match")
return
}
// TODO: verify aggregated commits with real schnor cosign verification
// TODO: return the signature(response) to leader
// For now, simply return the private key of this node.
@ -176,6 +230,20 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
// Set state to RESPONSE_DONE
consensus.state = RESPONSE_DONE
consensus.consensusId++
// TODO: think about when validators know about the consensus is reached.
// For now, the blockchain is updated right here.
// TODO: reconstruct the whole block from header and transactions
// For now, we used the stored whole block in consensus.blockHeader
txDecoder := gob.NewDecoder(bytes.NewReader(consensus.blockHeader))
var blockHeaderObj blockchain.Block
err := txDecoder.Decode(&blockHeaderObj)
if err != nil {
consensus.Log.Debug("failed to construct the new block after consensus")
}
consensus.OnConsensusDone(&blockHeaderObj)
}
// Construct the response message to send to leader (assumption the consensus data is already verified)

@ -9,7 +9,7 @@ func TestConstructCommitMessage(test *testing.T) {
leader := p2p.Peer{Ip: "1", Port: "2"}
validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = getBlockHash(make([]byte, 10))
consensus.blockHash = [32]byte{}
msg := consensus.constructCommitMessage()
if len(msg) != 1+1+1+4+32+2+33+64 {
@ -21,7 +21,7 @@ func TestConstructResponseMessage(test *testing.T) {
leader := p2p.Peer{Ip: "1", Port: "2"}
validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = getBlockHash(make([]byte, 10))
consensus.blockHash = [32]byte{}
msg := consensus.constructResponseMessage()
if len(msg) != 1+1+1+4+32+2+32+64 {

@ -85,19 +85,21 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) {
case REQUEST:
reader := bytes.NewBuffer(msgPayload[1:])
var txIds map[[32]byte]bool
txId := make([]byte, 32) // 32 byte hash Id
buf := make([]byte, 32) // 32 byte hash Id
for {
_, err := reader.Read(txId)
_, err := reader.Read(buf)
if err != nil {
break
}
txIds[getFixedByteTxId(txId)] = true
var txId [32]byte
copy(txId[:], buf)
txIds[txId] = true
}
var txToReturn []*blockchain.Transaction
for _, tx := range node.pendingTransactions {
if txIds[getFixedByteTxId(tx.ID)] {
if txIds[tx.ID] {
txToReturn = append(txToReturn, tx)
}
}
@ -106,15 +108,6 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) {
}
}
// getFixedByteTxId copies the txId byte slice over to 32 byte array so the map can key on it
func getFixedByteTxId(txId []byte) [32]byte {
var id [32]byte
for i := range id {
id[i] = txId[i]
}
return id
}
// WaitForConsensusReady ...
func (node *Node) WaitForConsensusReady(readySignal chan int) {
node.log.Debug("Waiting for consensus ready", "node", node)
@ -123,14 +116,6 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) {
for { // keep waiting for consensus ready
<-readySignal
//node.log.Debug("Adding new block", "currentChainSize", len(node.blockchain.Blocks), "numTxs", len(node.blockchain.GetLatestBlock().Transactions), "PrevHash", node.blockchain.GetLatestBlock().PrevBlockHash, "Hash", node.blockchain.GetLatestBlock().Hash)
if newBlock != nil {
// Consensus is done on the newBlock (in the previous round of consensus), add it to blockchain
node.blockchain.Blocks = append(node.blockchain.Blocks, newBlock)
// Update UTXO pool
node.UtxoPool.Update(node.transactionInConsensus)
// Clear transaction-in-consensus list
node.transactionInConsensus = []*blockchain.Transaction{}
}
for {
// Once we have more than 10 transactions pending we will try creating a new block
if len(node.pendingTransactions) >= 10 {
@ -155,3 +140,16 @@ func (node *Node) WaitForConsensusReady(readySignal chan int) {
node.BlockChannel <- *newBlock
}
}
func (node *Node) VerifyNewBlock(newBlock *blockchain.Block) bool {
return node.UtxoPool.VerifyTransactions(newBlock.Transactions)
}
func (node *Node) AddNewBlockToBlockchain(newBlock *blockchain.Block) {
// Add it to blockchain
node.blockchain.Blocks = append(node.blockchain.Blocks, newBlock)
// Update UTXO pool
node.UtxoPool.Update(newBlock.Transactions)
// Clear transaction-in-consensus list
node.transactionInConsensus = []*blockchain.Transaction{}
}

Loading…
Cancel
Save