pull/6/head
alok 7 years ago
commit e653d688fd
  1. 14
      .travis.yml
  2. 4
      README.md
  3. 91
      aws-code/transaction_generator.go
  4. 30
      benchmark_main.go
  5. 32
      blockchain/block.go
  6. 4
      blockchain/block_test.go
  7. 30
      blockchain/blockchain.go
  8. 2
      blockchain/blockchain_test.go
  9. 8
      blockchain/transaction.go
  10. 175
      blockchain/utxopool.go
  11. 18
      blockchain/utxopool_test.go
  12. 82
      consensus/consensus.go
  13. 124
      consensus/consensus_leader.go
  14. 11
      consensus/consensus_leader_test.go
  15. 108
      consensus/consensus_validator.go
  16. 4
      consensus/consensus_validator_test.go
  17. BIN
      harmony-benchmark
  18. 11
      log/CONTRIBUTORS
  19. 13
      log/LICENSE
  20. 77
      log/README.md
  21. 5
      log/README_ETHEREUM.md
  22. 334
      log/doc.go
  23. 364
      log/format.go
  24. 359
      log/handler.go
  25. 227
      log/handler_glog.go
  26. 26
      log/handler_go13.go
  27. 23
      log/handler_go14.go
  28. 244
      log/logger.go
  29. 72
      log/root.go
  30. 57
      log/syslog.go
  31. 21
      log/term/LICENSE
  32. 13
      log/term/terminal_appengine.go
  33. 13
      log/term/terminal_darwin.go
  34. 18
      log/term/terminal_freebsd.go
  35. 14
      log/term/terminal_linux.go
  36. 7
      log/term/terminal_netbsd.go
  37. 20
      log/term/terminal_notwindows.go
  38. 7
      log/term/terminal_openbsd.go
  39. 9
      log/term/terminal_solaris.go
  40. 26
      log/term/terminal_windows.go
  41. 9
      node/message.go
  42. 217
      node/node.go
  43. 155
      node/node_handler.go
  44. 34
      node/node_test.go
  45. 17
      p2p/peer.go

@ -1,12 +1,14 @@
language: go language: go
go: go:
- master - master
install: | install:
export GOPATH=$HOME/gopath - export GOPATH=$HOME/gopath
mkdir -p $HOME/gopath/src/harmony-benchmark - mkdir -p $HOME/gopath/src/harmony-benchmark
rsync -az ${TRAVIS_BUILD_DIR}/ $HOME/gopath/src/harmony-benchmark/ - rsync -az ${TRAVIS_BUILD_DIR}/ $HOME/gopath/src/harmony-benchmark/
export TRAVIS_BUILD_DIR=$HOME/gopath/src/harmony-benchmark - export TRAVIS_BUILD_DIR=$HOME/gopath/src/harmony-benchmark
cd $HOME/gopath/src/harmony-benchmark - cd $HOME/gopath/src/harmony-benchmark
- go get github.com/go-stack/stack
- go build
notifications: notifications:
slack: slack:
secure: RPB3ThYIGuDUidvaWfOA7Hc9x1bDfd5+Y10r7xwY+NGCN3zW86s/GNLpLutI0MWTV9e2CJupHvz5clp8Ktle/tVjLhs6jHQnNV7U8PTWKkL5By6IFVAHN12unMQn/m0RPwqMfdubajXoV51XhbFA/iow/0fqwsd61VdPIuBrlQjy9z7kyVnRLNoGvYjDqKEkJfYVb3qFNFLzD0F7Y2AgxnezIRjsTLgHzR4owLJYqVMhvTYIV9/vSf1w4UUPzhHyZRESl6bri+a1+g7GxE32OtNwq68xxVeeJcrO/MbjAHHW9V6BW1MjJfYzD5T+7JHIfZOjV2WgzJ7uCkVYztfq+02yOCSWsLNxFVojIDhVFEhhJ6Vd2Zf1otolS7j0svK/qNmShID9q9NAasaI105GsQgtaSPAUGd88J/vyX2ndG1nDOvxmgOo10tZFOnPHW7JnWMybk3PLza8o1ujA7X3JFdvDA8BPP9h6MVP4N7doCQ/n4Crts53HvEWlvcv5sBNu61WYlSTBzf1qNwBKMyN2E0rNubsxKmW8B6jLdWYdlx57nyTRPraNKGE1fnUW5nWRZGax3F1tQRwEfpQMk22qgeUK0RYWsPgHFaPciKCA3dJX7t1k/ib9pyR4nc9SZnYw54KMhkAXPIVQ0iy0EpTAH1DNYV6v8zXCwjl+BdkhlY= secure: RPB3ThYIGuDUidvaWfOA7Hc9x1bDfd5+Y10r7xwY+NGCN3zW86s/GNLpLutI0MWTV9e2CJupHvz5clp8Ktle/tVjLhs6jHQnNV7U8PTWKkL5By6IFVAHN12unMQn/m0RPwqMfdubajXoV51XhbFA/iow/0fqwsd61VdPIuBrlQjy9z7kyVnRLNoGvYjDqKEkJfYVb3qFNFLzD0F7Y2AgxnezIRjsTLgHzR4owLJYqVMhvTYIV9/vSf1w4UUPzhHyZRESl6bri+a1+g7GxE32OtNwq68xxVeeJcrO/MbjAHHW9V6BW1MjJfYzD5T+7JHIfZOjV2WgzJ7uCkVYztfq+02yOCSWsLNxFVojIDhVFEhhJ6Vd2Zf1otolS7j0svK/qNmShID9q9NAasaI105GsQgtaSPAUGd88J/vyX2ndG1nDOvxmgOo10tZFOnPHW7JnWMybk3PLza8o1ujA7X3JFdvDA8BPP9h6MVP4N7doCQ/n4Crts53HvEWlvcv5sBNu61WYlSTBzf1qNwBKMyN2E0rNubsxKmW8B6jLdWYdlx57nyTRPraNKGE1fnUW5nWRZGax3F1tQRwEfpQMk22qgeUK0RYWsPgHFaPciKCA3dJX7t1k/ib9pyR4nc9SZnYw54KMhkAXPIVQ0iy0EpTAH1DNYV6v8zXCwjl+BdkhlY=

@ -12,6 +12,10 @@ mkdir -p $HOME/<path_of_your_choice>/src
cd $HOME/<path_of_your_choice>/src cd $HOME/<path_of_your_choice>/src
git clone git@github.com:simple-rules/harmony-benchmark.git git clone git@github.com:simple-rules/harmony-benchmark.git
cd harmony-benchmark
go get github.com/go-stack/stack
``` ```
## Usage ## Usage
``` ```

@ -5,22 +5,69 @@ import (
"flag" "flag"
"fmt" "fmt"
"harmony-benchmark/blockchain" "harmony-benchmark/blockchain"
"harmony-benchmark/log"
"harmony-benchmark/node" "harmony-benchmark/node"
"harmony-benchmark/p2p" "harmony-benchmark/p2p"
"log"
"math/rand" "math/rand"
"os" "os"
"strings" "strings"
"time" "time"
"harmony-benchmark/consensus"
"encoding/hex"
"strconv"
) )
func newRandTransaction() blockchain.Transaction { // Get numTxs number of Fake transactions based on the existing UtxoPool.
txin := blockchain.TXInput{[]byte{}, rand.Intn(100), string(rand.Uint64())} // The transactions are generated by going through the existing utxos and
txout := blockchain.TXOutput{rand.Intn(100), string(rand.Uint64())} // randomly select a subset of them as input to new transactions. The output
tx := blockchain.Transaction{nil, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}} // address of the new transaction are randomly selected from 1 - 1000.
tx.SetID() // NOTE: the genesis block should contain 1000 coinbase transactions adding
// value to each address in [1 - 1000]. See node.AddMoreFakeTransactions()
func getNewFakeTransactions(dataNode *node.Node, numTxs int) []*blockchain.Transaction {
/*
UTXO map structure:
address - [
txId1 - [
outputIndex1 - value1
outputIndex2 - value2
]
txId2 - [
outputIndex1 - value1
outputIndex2 - value2
]
]
*/
var outputs []*blockchain.Transaction
count := 0
countAll := 0
return tx for address, txMap := range dataNode.UtxoPool.UtxoMap {
for txIdStr, utxoMap := range txMap {
txId, err := hex.DecodeString(txIdStr)
if err != nil {
continue
}
for index, value := range utxoMap {
countAll++
if rand.Intn(100) <= 20 { // 20% sample rate to select UTXO to use for new transactions
// Spend the money of current UTXO to a random address in [1 - 1000]
txin := blockchain.TXInput{txId, index, address}
txout := blockchain.TXOutput{value, strconv.Itoa(rand.Intn(1000))}
tx := blockchain.Transaction{[32]byte{}, []blockchain.TXInput{txin}, []blockchain.TXOutput{txout}}
tx.SetID()
if count >= numTxs {
continue
}
outputs = append(outputs, &tx)
count++
}
}
}
}
log.Debug("UTXO", "poolSize", countAll, "numTxsToSend", numTxs)
return outputs
} }
func getValidators(config string) []p2p.Peer { func getValidators(config string) []p2p.Peer {
@ -63,27 +110,51 @@ func readConfigFile(configFile string) [][]string {
} }
func main() { func main() {
// Setup a stdout logger
h := log.CallerFileHandler(log.StdoutHandler)
log.Root().SetHandler(h)
configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config") configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config")
numTxsPerBatch := flag.Int("num_txs_per_batch", 1000, "number of transactions to send per message")
flag.Parse() flag.Parse()
config := readConfigFile(*configFile) config := readConfigFile(*configFile)
<<<<<<< HEAD
=======
leaders := getLeaders(&config)
// Testing node to mirror the node data in consensus
dataNode := node.NewNode(&consensus.Consensus{})
dataNode.AddMoreFakeTransactions()
>>>>>>> 31f111ed5d49c01a9c3a20e92e053016c05ebafc
start := time.Now() start := time.Now()
totalTime := 60.0 totalTime := 60.0
txs := make([]blockchain.Transaction, 10) time.Sleep(3 * time.Second) // wait for nodes to be ready
leaders := getLeaders(&config)
for true { for true {
t := time.Now() t := time.Now()
if t.Sub(start).Seconds() >= totalTime { if t.Sub(start).Seconds() >= totalTime {
fmt.Println(int(t.Sub(start)), start, totalTime) fmt.Println(int(t.Sub(start)), start, totalTime)
break break
} }
<<<<<<< HEAD
for i := range txs { for i := range txs {
txs[i] = newRandTransaction() txs[i] = newRandTransaction()
} }
msg := node.ConstructTransactionListMessage(txs) msg := node.ConstructTransactionListMessage(txs)
log.Printf("[Generator] Sending txs to %d leader[s]\n", len(leaders)) log.Printf("[Generator] Sending txs to %d leader[s]\n", len(leaders))
=======
txsToSend := getNewFakeTransactions(&dataNode, *numTxsPerBatch)
msg := node.ConstructTransactionListMessage(txsToSend)
log.Debug("[Generator] Sending txs...", "numTxs", len(txsToSend))
>>>>>>> 31f111ed5d49c01a9c3a20e92e053016c05ebafc
p2p.BroadcastMessage(leaders, msg) p2p.BroadcastMessage(leaders, msg)
time.Sleep(1 * time.Second) // 10 transactions per second
// Update local utxo pool to mirror the utxo pool of a real node
dataNode.UtxoPool.Update(txsToSend)
time.Sleep(1 * time.Second) // Send a batch of transactions every second
} }
// Send a stop message to stop the nodes at the end
msg := node.ConstructStopMessage() msg := node.ConstructStopMessage()
peers := append(getValidators(*configFile), leaders...) peers := append(getValidators(*configFile), leaders...)
p2p.BroadcastMessage(peers, msg) p2p.BroadcastMessage(peers, msg)

@ -4,9 +4,9 @@ import (
"bufio" "bufio"
"flag" "flag"
"harmony-benchmark/consensus" "harmony-benchmark/consensus"
"harmony-benchmark/log"
"harmony-benchmark/node" "harmony-benchmark/node"
"harmony-benchmark/p2p" "harmony-benchmark/p2p"
"log"
"os" "os"
"strings" "strings"
) )
@ -59,6 +59,10 @@ func readConfigFile(configFile string) [][]string {
} }
func main() { func main() {
// Setup a stdout logger
h := log.CallerFileHandler(log.StdoutHandler)
log.Root().SetHandler(h)
ip := flag.String("ip", "127.0.0.1", "IP of the node") ip := flag.String("ip", "127.0.0.1", "IP of the node")
port := flag.String("port", "9000", "port of the node.") port := flag.String("port", "9000", "port of the node.")
configFile := flag.String("config_file", "config.txt", "file containing all ip addresses") configFile := flag.String("config_file", "config.txt", "file containing all ip addresses")
@ -71,28 +75,26 @@ func main() {
consensus := consensus.NewConsensus(*ip, *port, shardId, peers, leader) consensus := consensus.NewConsensus(*ip, *port, shardId, peers, leader)
var nodeStatus string
if consensus.IsLeader {
nodeStatus = "leader"
} else {
nodeStatus = "validator"
}
log.Println("======================================")
log.Printf("This node is a %s node in shard %s listening on ip: %s and port: %s\n", nodeStatus, shardId, *ip, *port)
log.Println("======================================")
node := node.NewNode(&consensus) node := node.NewNode(&consensus)
// Assign closure functions to the consensus object
consensus.BlockVerifier = node.VerifyNewBlock
consensus.OnConsensusDone = node.AddNewBlockToBlockchain
// Temporary testing code, to be removed.
node.AddMoreFakeTransactions()
if consensus.IsLeader { if consensus.IsLeader {
// Let consensus run // Let consensus run
go func() { go func() {
log.Println("Waiting for block")
consensus.WaitForNewBlock(node.BlockChannel) consensus.WaitForNewBlock(node.BlockChannel)
}() }()
// Node waiting for consensus readiness to create new block // Node waiting for consensus readiness to create new block
go func() { go func() {
log.Println("Waiting for consensus ready") node.WaitForConsensusReady(consensus.ReadySignal)
}()
} else {
// Node waiting to add new block to the blockchain
go func() {
node.WaitForConsensusReady(consensus.ReadySignal) node.WaitForConsensusReady(consensus.ReadySignal)
}() }()
} }

@ -9,12 +9,19 @@ import (
"time" "time"
) )
// Block keeps block headers. // Block keeps block headers, transactions and signature.
type Block struct { type Block struct {
Timestamp int64 // Header
Timestamp int64
PrevBlockHash [32]byte
Hash [32]byte
NumTransactions int32
TransactionIds [][32]byte
// Transactions
Transactions []*Transaction Transactions []*Transaction
PrevBlockHash []byte
Hash []byte // Signature...
} }
// Serialize serializes the block // Serialize serializes the block
@ -56,20 +63,27 @@ func (b *Block) HashTransactions() []byte {
var txHash [32]byte var txHash [32]byte
for _, tx := range b.Transactions { for _, tx := range b.Transactions {
txHashes = append(txHashes, tx.ID) txHashes = append(txHashes, tx.ID[:])
} }
txHash = sha256.Sum256(bytes.Join(txHashes, []byte{})) txHash = sha256.Sum256(bytes.Join(txHashes, []byte{}))
return txHash[:] return txHash[:]
} }
// NewBlock creates and returns a neew block. // NewBlock creates and returns a neew block.
func NewBlock(transactions []*Transaction, prevBlockHash []byte) *Block { func NewBlock(transactions []*Transaction, prevBlockHash [32]byte) *Block {
block := &Block{time.Now().Unix(), transactions, prevBlockHash, []byte{}} numTxs := int32(len(transactions))
block.Hash = block.HashTransactions() var txIds [][32]byte
for _, tx := range transactions {
txIds = append(txIds, tx.ID)
}
block := &Block{time.Now().Unix(), prevBlockHash, [32]byte{},numTxs, txIds,transactions}
copy(block.Hash[:], block.HashTransactions()[:]) // TODO(Minh): the blockhash should be a hash of everything in the block
return block return block
} }
// NewGenesisBlock creates and returns genesis Block. // NewGenesisBlock creates and returns genesis Block.
func NewGenesisBlock(coinbase *Transaction) *Block { func NewGenesisBlock(coinbase *Transaction) *Block {
return NewBlock([]*Transaction{coinbase}, []byte{}) return NewBlock([]*Transaction{coinbase}, [32]byte{})
} }

@ -19,11 +19,11 @@ func TestBlockSerialize(t *testing.T) {
t.Errorf("Serialize or Deserialize incorrect at TimeStamp.") t.Errorf("Serialize or Deserialize incorrect at TimeStamp.")
} }
if bytes.Compare(block.PrevBlockHash, deserializedBlock.PrevBlockHash) != 0 { if bytes.Compare(block.PrevBlockHash[:], deserializedBlock.PrevBlockHash[:]) != 0 {
t.Errorf("Serialize or Deserialize incorrect at PrevBlockHash.") t.Errorf("Serialize or Deserialize incorrect at PrevBlockHash.")
} }
if bytes.Compare(block.Hash, deserializedBlock.Hash) != 0 { if bytes.Compare(block.Hash[:], deserializedBlock.Hash[:]) != 0 {
t.Errorf("Serialize or Deserialize incorrect at Hash.") t.Errorf("Serialize or Deserialize incorrect at Hash.")
} }
} }

@ -7,21 +7,29 @@ import (
// Blockchain keeps a sequence of Blocks // Blockchain keeps a sequence of Blocks
type Blockchain struct { type Blockchain struct {
blocks []*Block Blocks []*Block
} }
const genesisCoinbaseData = "The Times 03/Jan/2009 Chancellor on brink of second bailout for banks" const genesisCoinbaseData = "The Times 03/Jan/2009 Chancellor on brink of second bailout for banks"
// Get the latest block at the end of the chain
func (bc *Blockchain) GetLatestBlock() *Block{
if len(bc.Blocks) == 0 {
return nil
}
return bc.Blocks[len(bc.Blocks) - 1]
}
// FindUnspentTransactions returns a list of transactions containing unspent outputs // FindUnspentTransactions returns a list of transactions containing unspent outputs
func (bc *Blockchain) FindUnspentTransactions(address string) []Transaction { func (bc *Blockchain) FindUnspentTransactions(address string) []Transaction {
var unspentTXs []Transaction var unspentTXs []Transaction
spentTXOs := make(map[string][]int) spentTXOs := make(map[string][]int)
for index := len(bc.blocks) - 1; index >= 0; index-- { for index := len(bc.Blocks) - 1; index >= 0; index-- {
block := bc.blocks[index] block := bc.Blocks[index]
for _, tx := range block.Transactions { for _, tx := range block.Transactions {
txID := hex.EncodeToString(tx.ID) txID := hex.EncodeToString(tx.ID[:])
idx := -1 idx := -1
// TODO(minhdoan): Optimize this. // TODO(minhdoan): Optimize this.
@ -77,7 +85,7 @@ func (bc *Blockchain) FindSpendableOutputs(address string, amount int) (int, map
Work: Work:
for _, tx := range unspentTXs { for _, tx := range unspentTXs {
txID := hex.EncodeToString(tx.ID) txID := hex.EncodeToString(tx.ID[:])
for outIdx, txOutput := range tx.TxOutput { for outIdx, txOutput := range tx.TxOutput {
if txOutput.Address == address && accumulated < amount { if txOutput.Address == address && accumulated < amount {
@ -124,7 +132,7 @@ func (bc *Blockchain) NewUTXOTransaction(from, to string, amount int) *Transacti
outputs = append(outputs, TXOutput{acc - amount, from}) // a change outputs = append(outputs, TXOutput{acc - amount, from}) // a change
} }
tx := Transaction{nil, inputs, outputs} tx := Transaction{[32]byte{}, inputs, outputs}
tx.SetID() tx.SetID()
return &tx return &tx
@ -135,7 +143,7 @@ func (bc *Blockchain) NewUTXOTransaction(from, to string, amount int) *Transacti
func (bc *Blockchain) AddNewUserTransfer(utxoPool *UTXOPool, from, to string, amount int) bool { func (bc *Blockchain) AddNewUserTransfer(utxoPool *UTXOPool, from, to string, amount int) bool {
tx := bc.NewUTXOTransaction(from, to, amount) tx := bc.NewUTXOTransaction(from, to, amount)
if tx != nil { if tx != nil {
newBlock := NewBlock([]*Transaction{tx}, bc.blocks[len(bc.blocks)-1].Hash) newBlock := NewBlock([]*Transaction{tx}, bc.Blocks[len(bc.Blocks)-1].Hash)
if bc.VerifyNewBlockAndUpdate(utxoPool, newBlock) { if bc.VerifyNewBlockAndUpdate(utxoPool, newBlock) {
return true return true
} }
@ -145,18 +153,18 @@ func (bc *Blockchain) AddNewUserTransfer(utxoPool *UTXOPool, from, to string, am
// VerifyNewBlockAndUpdate verifies if the new coming block is valid for the current blockchain. // VerifyNewBlockAndUpdate verifies if the new coming block is valid for the current blockchain.
func (bc *Blockchain) VerifyNewBlockAndUpdate(utxopool *UTXOPool, block *Block) bool { func (bc *Blockchain) VerifyNewBlockAndUpdate(utxopool *UTXOPool, block *Block) bool {
length := len(bc.blocks) length := len(bc.Blocks)
if bytes.Compare(block.PrevBlockHash, bc.blocks[length-1].Hash) != 0 { if bytes.Compare(block.PrevBlockHash[:], bc.Blocks[length-1].Hash[:]) != 0 {
return false return false
} }
if block.Timestamp < bc.blocks[length-1].Timestamp { if block.Timestamp < bc.Blocks[length-1].Timestamp {
return false return false
} }
if utxopool != nil && !utxopool.VerifyAndUpdate(block.Transactions) { if utxopool != nil && !utxopool.VerifyAndUpdate(block.Transactions) {
return false return false
} }
bc.blocks = append(bc.blocks, block) bc.Blocks = append(bc.Blocks, block)
return true return true
} }

@ -68,7 +68,7 @@ func TestVerifyNewBlock(t *testing.T) {
if tx == nil { if tx == nil {
t.Error("failed to create a new transaction.") t.Error("failed to create a new transaction.")
} }
newBlock := NewBlock([]*Transaction{tx}, bc.blocks[len(bc.blocks)-1].Hash) newBlock := NewBlock([]*Transaction{tx}, bc.Blocks[len(bc.Blocks)-1].Hash)
if !bc.VerifyNewBlockAndUpdate(utxoPool, newBlock) { if !bc.VerifyNewBlockAndUpdate(utxoPool, newBlock) {
t.Error("failed to add a new valid block.") t.Error("failed to add a new valid block.")

@ -11,7 +11,7 @@ import (
// Transaction represents a Bitcoin transaction // Transaction represents a Bitcoin transaction
type Transaction struct { type Transaction struct {
ID []byte // 32 byte hash ID [32]byte // 32 byte hash
TxInput []TXInput TxInput []TXInput
TxOutput []TXOutput TxOutput []TXOutput
} }
@ -43,7 +43,7 @@ func (tx *Transaction) SetID() {
log.Panic(err) log.Panic(err)
} }
hash = sha256.Sum256(encoded.Bytes()) hash = sha256.Sum256(encoded.Bytes())
tx.ID = hash[:] tx.ID = hash
} }
// NewCoinbaseTX creates a new coinbase transaction // NewCoinbaseTX creates a new coinbase transaction
@ -54,7 +54,7 @@ func NewCoinbaseTX(to, data string) *Transaction {
txin := TXInput{[]byte{}, -1, data} txin := TXInput{[]byte{}, -1, data}
txout := TXOutput{DefaultCoinbaseValue, to} txout := TXOutput{DefaultCoinbaseValue, to}
tx := Transaction{nil, []TXInput{txin}, []TXOutput{txout}} tx := Transaction{[32]byte{}, []TXInput{txin}, []TXOutput{txout}}
tx.SetID() tx.SetID()
return &tx return &tx
} }
@ -76,7 +76,7 @@ func (txOutput *TXOutput) String() string {
// Used for debuging. // Used for debuging.
func (tx *Transaction) String() string { func (tx *Transaction) String() string {
res := fmt.Sprintf("ID: %v\n", hex.EncodeToString(tx.ID)) res := fmt.Sprintf("ID: %v\n", hex.EncodeToString(tx.ID[:]))
res += fmt.Sprintf("TxInput:\n") res += fmt.Sprintf("TxInput:\n")
for id, value := range tx.TxInput { for id, value := range tx.TxInput {
res += fmt.Sprintf("%v: %v\n", id, value.String()) res += fmt.Sprintf("%v: %v\n", id, value.String())

@ -2,19 +2,38 @@ package blockchain
import ( import (
"encoding/hex" "encoding/hex"
"fmt"
)
const (
// MaxNumberOfTransactions is the max number of transaction per a block.
MaxNumberOfTransactions = 100
) )
// UTXOPool stores transactions and balance associated with each address. // UTXOPool stores transactions and balance associated with each address.
type UTXOPool struct { type UTXOPool struct {
// Mapping from address to a map of transaction id to a map of the index of output // Mapping from address to a map of transaction id to a map of the index of output
// array in that transaction to that balance. // array in that transaction to that balance.
utxo map[string]map[string]map[int]int /*
The 3-d map's structure:
address - [
txId1 - [
outputIndex1 - value1
outputIndex2 - value2
]
txId2 - [
outputIndex1 - value1
outputIndex2 - value2
]
]
*/
UtxoMap map[string]map[string]map[int]int
} }
// VerifyTransactions verifies if a list of transactions valid. // VerifyTransactions verifies if a list of transactions valid.
func (utxopool *UTXOPool) VerifyTransactions(transactions []*Transaction) bool { func (utxoPool *UTXOPool) VerifyTransactions(transactions []*Transaction) bool {
spentTXOs := make(map[string]map[string]map[int]bool) spentTXOs := make(map[string]map[string]map[int]bool)
if utxopool != nil { if utxoPool != nil {
for _, tx := range transactions { for _, tx := range transactions {
inTotal := 0 inTotal := 0
// Calculate the sum of TxInput // Calculate the sum of TxInput
@ -37,7 +56,7 @@ func (utxopool *UTXOPool) VerifyTransactions(transactions []*Transaction) bool {
spentTXOs[in.Address][inTxID][index] = true spentTXOs[in.Address][inTxID][index] = true
// Sum the balance up to the inTotal. // Sum the balance up to the inTotal.
if val, ok := utxopool.utxo[in.Address][inTxID][index]; ok { if val, ok := utxoPool.UtxoMap[in.Address][inTxID][index]; ok {
inTotal += val inTotal += val
} else { } else {
return false return false
@ -57,57 +76,161 @@ func (utxopool *UTXOPool) VerifyTransactions(transactions []*Transaction) bool {
return true return true
} }
// VerifyOneTransaction verifies if a list of transactions valid.
func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction) bool {
spentTXOs := make(map[string]map[string]map[int]bool)
txID := hex.EncodeToString(tx.ID[:])
inTotal := 0
// Calculate the sum of TxInput
for _, in := range tx.TxInput {
inTxID := hex.EncodeToString(in.TxID)
index := in.TxOutputIndex
// Check if the transaction with the addres is spent or not.
if val, ok := utxoPool.UtxoMap[in.Address][inTxID][index]; ok {
inTotal += val
} else {
return false
}
// Mark the transactions with the address and index spent.
if _, ok := spentTXOs[in.Address]; !ok {
spentTXOs[in.Address] = make(map[string]map[int]bool)
}
if _, ok := spentTXOs[in.Address][inTxID]; !ok {
spentTXOs[in.Address][inTxID] = make(map[int]bool)
}
if spentTXOs[in.Address][inTxID][index] {
return false
}
spentTXOs[in.Address][inTxID][index] = true
}
outTotal := 0
// Calculate the sum of TxOutput
for index, out := range tx.TxOutput {
if _, ok := spentTXOs[out.Address][txID][index]; ok {
return false
}
outTotal += out.Value
}
if inTotal != outTotal {
return false
}
return true
}
// UpdateOneTransaction updates utxoPool in respect to the new Transaction.
func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
if utxoPool != nil {
txID := hex.EncodeToString(tx.ID[:])
// Remove
for _, in := range tx.TxInput {
inTxID := hex.EncodeToString(in.TxID)
delete(utxoPool.UtxoMap[in.Address][inTxID], in.TxOutputIndex)
}
// Update
for index, out := range tx.TxOutput {
if _, ok := utxoPool.UtxoMap[out.Address]; !ok {
utxoPool.UtxoMap[out.Address] = make(map[string]map[int]int)
utxoPool.UtxoMap[out.Address][txID] = make(map[int]int)
}
if _, ok := utxoPool.UtxoMap[out.Address][txID]; !ok {
utxoPool.UtxoMap[out.Address][txID] = make(map[int]int)
}
utxoPool.UtxoMap[out.Address][txID][index] = out.Value
}
}
}
// VerifyOneTransactionAndUpdate verifies and update a valid transaction.
// Return false if the transaction is not valid.
func (utxoPool *UTXOPool) VerifyOneTransactionAndUpdate(tx *Transaction) bool {
if utxoPool.VerifyOneTransaction(tx) {
utxoPool.UpdateOneTransaction(tx)
return true
}
return false
}
// VerifyAndUpdate verifies a list of transactions and update utxoPool. // VerifyAndUpdate verifies a list of transactions and update utxoPool.
func (utxopool *UTXOPool) VerifyAndUpdate(transactions []*Transaction) bool { func (utxoPool *UTXOPool) VerifyAndUpdate(transactions []*Transaction) bool {
if utxopool.VerifyTransactions(transactions) { if utxoPool.VerifyTransactions(transactions) {
utxopool.Update(transactions) utxoPool.Update(transactions)
return true return true
} }
return false return false
} }
// Update utxo balances with a list of new transactions. // Update Utxo balances with a list of new transactions.
func (utxopool *UTXOPool) Update(transactions []*Transaction) { func (utxoPool *UTXOPool) Update(transactions []*Transaction) {
if utxopool != nil { if utxoPool != nil {
for _, tx := range transactions { for _, tx := range transactions {
curTxID := hex.EncodeToString(tx.ID) curTxID := hex.EncodeToString(tx.ID[:])
// Remove // Remove
for _, in := range tx.TxInput { for _, in := range tx.TxInput {
inTxID := hex.EncodeToString(in.TxID) inTxID := hex.EncodeToString(in.TxID)
delete(utxopool.utxo[in.Address][inTxID], in.TxOutputIndex) delete(utxoPool.UtxoMap[in.Address][inTxID], in.TxOutputIndex)
} }
// Update // Update
for index, out := range tx.TxOutput { for index, out := range tx.TxOutput {
if _, ok := utxopool.utxo[out.Address]; !ok { if _, ok := utxoPool.UtxoMap[out.Address]; !ok {
utxopool.utxo[out.Address] = make(map[string]map[int]int) utxoPool.UtxoMap[out.Address] = make(map[string]map[int]int)
utxopool.utxo[out.Address][curTxID] = make(map[int]int) utxoPool.UtxoMap[out.Address][curTxID] = make(map[int]int)
} }
if _, ok := utxopool.utxo[out.Address][curTxID]; !ok { if _, ok := utxoPool.UtxoMap[out.Address][curTxID]; !ok {
utxopool.utxo[out.Address][curTxID] = make(map[int]int) utxoPool.UtxoMap[out.Address][curTxID] = make(map[int]int)
} }
utxopool.utxo[out.Address][curTxID][index] = out.Value utxoPool.UtxoMap[out.Address][curTxID][index] = out.Value
} }
} }
} }
} }
// CreateUTXOPoolFromTransaction a utxo pool from a genesis transaction. // CreateUTXOPoolFromTransaction a Utxo pool from a genesis transaction.
func CreateUTXOPoolFromTransaction(tx *Transaction) *UTXOPool { func CreateUTXOPoolFromTransaction(tx *Transaction) *UTXOPool {
var utxoPool UTXOPool var utxoPool UTXOPool
txID := hex.EncodeToString(tx.ID) txID := hex.EncodeToString(tx.ID[:])
utxoPool.utxo = make(map[string]map[string]map[int]int) utxoPool.UtxoMap = make(map[string]map[string]map[int]int)
for index, out := range tx.TxOutput { for index, out := range tx.TxOutput {
utxoPool.utxo[out.Address] = make(map[string]map[int]int) utxoPool.UtxoMap[out.Address] = make(map[string]map[int]int)
utxoPool.utxo[out.Address][txID] = make(map[int]int) utxoPool.UtxoMap[out.Address][txID] = make(map[int]int)
utxoPool.utxo[out.Address][txID][index] = out.Value utxoPool.UtxoMap[out.Address][txID][index] = out.Value
} }
return &utxoPool return &utxoPool
} }
// CreateUTXOPoolFromGenesisBlockChain a utxo pool from a genesis blockchain. // CreateUTXOPoolFromGenesisBlockChain a Utxo pool from a genesis blockchain.
func CreateUTXOPoolFromGenesisBlockChain(bc *Blockchain) *UTXOPool { func CreateUTXOPoolFromGenesisBlockChain(bc *Blockchain) *UTXOPool {
tx := bc.blocks[0].Transactions[0] tx := bc.Blocks[0].Transactions[0]
return CreateUTXOPoolFromTransaction(tx) return CreateUTXOPoolFromTransaction(tx)
} }
// SelectTransactionsForNewBlock returns a list of index of valid transactions for the new block.
func (utxoPool *UTXOPool) SelectTransactionsForNewBlock(transactions []*Transaction) ([]*Transaction, []*Transaction) {
selected, unselected := []*Transaction{}, []*Transaction{}
for _, tx := range transactions {
if len(selected) < MaxNumberOfTransactions && utxoPool.VerifyOneTransaction(tx) {
selected = append(selected, tx)
} else {
unselected = append(unselected, tx)
}
}
return selected, unselected
}
// Used for debugging.
func (utxoPool *UTXOPool) String() string {
res := ""
for address, v1 := range utxoPool.UtxoMap {
for txid, v2 := range v1 {
for index, value := range v2 {
res += fmt.Sprintf("address: %v, tx id: %v, index: %v, value: %v\n", address, txid, index, value)
}
}
}
return res
}

@ -4,8 +4,20 @@ import (
"testing" "testing"
) )
func TestVerifyTransactions(t *testing.T) { func TestVerifyOneTransactionAndUpdate(t *testing.T) {
if cbtx := NewCoinbaseTX("minh", genesisCoinbaseData); cbtx == nil { bc := CreateBlockchain("minh")
t.Errorf("failed to create a coinbase transaction.") utxoPool := CreateUTXOPoolFromGenesisBlockChain(bc)
bc.AddNewUserTransfer(utxoPool, "minh", "alok", 3)
bc.AddNewUserTransfer(utxoPool, "minh", "rj", 100)
tx := bc.NewUTXOTransaction("minh", "mark", 10)
if tx == nil {
t.Error("failed to create a new transaction.")
}
if !utxoPool.VerifyOneTransaction(tx) {
t.Error("failed to verify a valid transaction.")
} }
utxoPool.VerifyOneTransactionAndUpdate(tx)
} }

@ -2,47 +2,36 @@
package consensus // consensus package consensus // consensus
import ( import (
"fmt"
"harmony-benchmark/blockchain"
"harmony-benchmark/common" "harmony-benchmark/common"
"harmony-benchmark/log"
"harmony-benchmark/p2p" "harmony-benchmark/p2p"
"log"
"regexp" "regexp"
"strconv" "strconv"
) )
// Consensus data containing all info related to one consensus process // Consensus data containing all info related to one consensus process
type Consensus struct { type Consensus struct {
state ConsensusState state ConsensusState
// Signatures collected from validators commits map[string]string // Signatures collected from validators
commits map[string]string responses map[string]string // Signatures collected from validators
// Signatures collected from validators data string // Actual block data to reach consensus on
responses map[string]string validators []p2p.Peer // List of validators
// Actual block data to reach consensus on leader p2p.Peer // Leader
data string priKey string // private key of current node
// List of validators IsLeader bool // Whether I am leader. False means I am validator
validators []p2p.Peer nodeId uint16 // Leader or validator Id - 2 byte
// Leader consensusId uint32 // Consensus Id (View Id) - 4 byte
leader p2p.Peer blockHash [32]byte // Blockhash - 32 byte
// private key of current node blockHeader []byte // BlockHeader to run consensus on
priKey string ShardIDShardID uint32 // Shard Id which this node belongs to
// Whether I am leader. False means I am validator ReadySignal chan int // Signal channel for starting a new consensus process
IsLeader bool BlockVerifier func(*blockchain.Block) bool // The verifier func passed from Node object
// Leader or validator Id - 2 byte OnConsensusDone func(*blockchain.Block) // The post-consensus processing func passed from Node object. Called when consensus on a new block is done
nodeId uint16 msgCategory byte // Network related fields
// Consensus Id (View Id) - 4 byte actionType byte
consensusId uint32 Log log.Logger
// Blockhash - 32 byte
blockHash []byte
// BlockHeader to run consensus on
blockHeader []byte
// Shard Id which this node belongs to
ShardId uint32
// Signal channel for starting a new consensus process
ReadySignal chan int
//// Network related fields
msgCategory byte
actionType byte
} }
// Consensus state enum for both leader and validator // Consensus state enum for both leader and validator
@ -77,8 +66,10 @@ func (state ConsensusState) String() string {
return names[state] return names[state]
} }
// Create a new Consensus object // NewConsensus creates a new Consensus object
func NewConsensus(ip, port, shardId string, peers []p2p.Peer, leader p2p.Peer) Consensus { // TODO(minhdoan): Maybe convert it into just New
// FYI, see https://golang.org/doc/effective_go.html?#package-names
func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) Consensus {
// The first Ip, port passed will be leader. // The first Ip, port passed will be leader.
consensus := Consensus{} consensus := Consensus{}
peer := p2p.Peer{Port: port, Ip: ip} peer := p2p.Peer{Port: port, Ip: ip}
@ -98,14 +89,14 @@ func NewConsensus(ip, port, shardId string, peers []p2p.Peer, leader p2p.Peer) C
reg, err := regexp.Compile("[^0-9]+") reg, err := regexp.Compile("[^0-9]+")
if err != nil { if err != nil {
log.Fatal(err) consensus.Log.Crit("Regex Compilation Failed", "err", err, "consensus", consensus)
} }
consensus.consensusId = 0 consensus.consensusId = 0
myShardId, err := strconv.Atoi(shardId) myShardIDShardID, err := strconv.Atoi(ShardID)
if err != nil { if err != nil {
panic("Unparseable shard Id" + shardId) panic("Unparseable shard Id" + ShardID)
} }
consensus.ShardId = uint32(myShardId) consensus.ShardIDShardID = uint32(myShardIDShardID)
// For now use socket address as 16 byte Id // For now use socket address as 16 byte Id
// TODO: populate with correct Id // TODO: populate with correct Id
@ -123,6 +114,8 @@ func NewConsensus(ip, port, shardId string, peers []p2p.Peer, leader p2p.Peer) C
consensus.msgCategory = byte(common.COMMITTEE) consensus.msgCategory = byte(common.COMMITTEE)
consensus.actionType = byte(common.CONSENSUS) consensus.actionType = byte(common.CONSENSUS)
consensus.Log = log.New()
return consensus return consensus
} }
@ -132,3 +125,14 @@ func (consensus *Consensus) ResetState() {
consensus.commits = make(map[string]string) consensus.commits = make(map[string]string)
consensus.responses = make(map[string]string) consensus.responses = make(map[string]string)
} }
// Returns ID of this consensus
func (consensus *Consensus) String() string {
var duty string
if consensus.IsLeader {
duty = "LDR" // leader
} else {
duty = "VLD" // validator
}
return fmt.Sprintf("[%s, %s, %v, %v]", duty, consensus.priKey, consensus.ShardIDShardID, consensus.nodeId)
}

@ -1,21 +1,22 @@
package consensus package consensus
import ( import (
"log"
"sync" "sync"
"bytes" "bytes"
"crypto/sha256"
"encoding/binary" "encoding/binary"
"errors"
"fmt"
"harmony-benchmark/blockchain" "harmony-benchmark/blockchain"
"harmony-benchmark/p2p" "harmony-benchmark/p2p"
"strings"
"encoding/gob"
) )
var mutex = &sync.Mutex{} var mutex = &sync.Mutex{}
// WaitForNewBlock waits for a new block. // WaitForNewBlock waits for a new block.
func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) { func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block) {
consensus.Log.Debug("Waiting for block", "consensus", consensus)
for { // keep waiting for new blocks for { // keep waiting for new blocks
newBlock := <-blockChannel newBlock := <-blockChannel
// TODO: think about potential race condition // TODO: think about potential race condition
@ -29,28 +30,27 @@ func (consensus *Consensus) WaitForNewBlock(blockChannel chan blockchain.Block)
func (consensus *Consensus) ProcessMessageLeader(message []byte) { func (consensus *Consensus) ProcessMessageLeader(message []byte) {
msgType, err := GetConsensusMessageType(message) msgType, err := GetConsensusMessageType(message)
if err != nil { if err != nil {
log.Print(err) consensus.Log.Error("Failed to get consensus message type.", "err", err, "consensus", consensus)
} }
payload, err := GetConsensusMessagePayload(message) payload, err := GetConsensusMessagePayload(message)
if err != nil { if err != nil {
log.Print(err) consensus.Log.Error("Failed to get consensus message payload.", "err", err, "consensus", consensus)
} }
log.Printf("[Leader-%d] Received and processing message: %s\n", consensus.ShardId, msgType)
switch msgType { switch msgType {
case ANNOUNCE: case ANNOUNCE:
log.Printf("Unexpected message type: %s", msgType) consensus.Log.Error("Unexpected message type", "msgType", msgType, "consensus", consensus)
case COMMIT: case COMMIT:
consensus.processCommitMessage(payload) consensus.processCommitMessage(payload)
case CHALLENGE: case CHALLENGE:
log.Printf("Unexpected message type: %s", msgType) consensus.Log.Error("Unexpected message type", "msgType", msgType, "consensus", consensus)
case RESPONSE: case RESPONSE:
consensus.processResponseMessage(payload) consensus.processResponseMessage(payload)
case START_CONSENSUS: case START_CONSENSUS:
consensus.processStartConsensusMessage(payload) consensus.processStartConsensusMessage(payload)
default: default:
log.Printf("Unexpected message type: %s", msgType) consensus.Log.Error("Unexpected message type", "msgType", msgType, "consensus", consensus)
} }
} }
@ -62,21 +62,23 @@ func (consensus *Consensus) processStartConsensusMessage(payload []byte) {
func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) { func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) {
// prepare message and broadcast to validators // prepare message and broadcast to validators
// Construct new block
//newBlock := constructNewBlock()
consensus.blockHash = newBlock.Hash
msgToSend, err := consensus.constructAnnounceMessage() // Copy over block hash and block header data
if err != nil { copy(consensus.blockHash[:], newBlock.Hash[:])
return
} byteBuffer := bytes.NewBuffer([]byte{})
encoder := gob.NewEncoder(byteBuffer)
encoder.Encode(newBlock)
consensus.blockHeader = byteBuffer.Bytes()
msgToSend := consensus.constructAnnounceMessage()
// Set state to ANNOUNCE_DONE // Set state to ANNOUNCE_DONE
consensus.state = ANNOUNCE_DONE consensus.state = ANNOUNCE_DONE
p2p.BroadcastMessage(consensus.validators, msgToSend) p2p.BroadcastMessage(consensus.validators, msgToSend)
} }
// Construct the announce message to send to validators // Construct the announce message to send to validators
func (consensus Consensus) constructAnnounceMessage() ([]byte, error) { func (consensus Consensus) constructAnnounceMessage() []byte {
buffer := bytes.NewBuffer([]byte{}) buffer := bytes.NewBuffer([]byte{})
// 4 byte consensus id // 4 byte consensus id
@ -85,10 +87,7 @@ func (consensus Consensus) constructAnnounceMessage() ([]byte, error) {
buffer.Write(fourBytes) buffer.Write(fourBytes)
// 32 byte block hash // 32 byte block hash
if len(consensus.blockHash) != 32 { buffer.Write(consensus.blockHash[:])
return buffer.Bytes(), errors.New(fmt.Sprintf("Block Hash size is %d bytes", len(consensus.blockHash)))
}
buffer.Write(consensus.blockHash)
// 2 byte leader id // 2 byte leader id
twoBytes := make([]byte, 2) twoBytes := make([]byte, 2)
@ -96,11 +95,10 @@ func (consensus Consensus) constructAnnounceMessage() ([]byte, error) {
buffer.Write(twoBytes) buffer.Write(twoBytes)
// n byte of block header // n byte of block header
blockHeader := getBlockHeader() buffer.Write(consensus.blockHeader)
buffer.Write(blockHeader)
// 4 byte of payload size // 4 byte of payload size
sizeOfPayload := uint32(len(blockHeader)) sizeOfPayload := uint32(len(consensus.blockHeader))
binary.BigEndian.PutUint32(fourBytes, sizeOfPayload) binary.BigEndian.PutUint32(fourBytes, sizeOfPayload)
buffer.Write(fourBytes) buffer.Write(fourBytes)
@ -108,27 +106,13 @@ func (consensus Consensus) constructAnnounceMessage() ([]byte, error) {
signature := signMessage(buffer.Bytes()) signature := signMessage(buffer.Bytes())
buffer.Write(signature) buffer.Write(signature)
return consensus.ConstructConsensusMessage(ANNOUNCE, buffer.Bytes()), nil return consensus.ConstructConsensusMessage(ANNOUNCE, buffer.Bytes())
}
// TODO: fill in this function
func constructNewBlock() []byte {
return make([]byte, 200)
} }
// TODO: fill in this function
func getBlockHash(block []byte) []byte {
return make([]byte, 32)
}
// TODO: fill in this function
func getBlockHeader() []byte {
return make([]byte, 200)
}
// TODO: fill in this function
func signMessage(message []byte) []byte { func signMessage(message []byte) []byte {
return make([]byte, 64) // TODO: implement real ECC signature
mockSignature := sha256.Sum256(message)
return append(mockSignature[:], mockSignature[:]...)
} }
func (consensus *Consensus) processCommitMessage(payload []byte) { func (consensus *Consensus) processCommitMessage(payload []byte) {
@ -167,7 +151,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
shouldProcess := !ok && consensus.state == ANNOUNCE_DONE shouldProcess := !ok && consensus.state == ANNOUNCE_DONE
if shouldProcess { if shouldProcess {
consensus.commits[validatorId] = validatorId consensus.commits[validatorId] = validatorId
log.Printf("Number of commits received: %d", len(consensus.commits)) //consensus.Log.Debug("Number of commits received", "count", len(consensus.commits))
} }
mutex.Unlock() mutex.Unlock()
@ -177,7 +161,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
mutex.Lock() mutex.Lock()
if len(consensus.commits) >= (2*len(consensus.validators))/3+1 { if len(consensus.commits) >= (2*len(consensus.validators))/3+1 {
log.Printf("Enough commits received with %d signatures", len(consensus.commits)) consensus.Log.Debug("Enough commits received with signatures", "numOfSignatures", len(consensus.commits))
if consensus.state == ANNOUNCE_DONE { if consensus.state == ANNOUNCE_DONE {
// Set state to CHALLENGE_DONE // Set state to CHALLENGE_DONE
consensus.state = CHALLENGE_DONE consensus.state = CHALLENGE_DONE
@ -199,7 +183,7 @@ func (consensus Consensus) constructChallengeMessage() []byte {
buffer.Write(fourBytes) buffer.Write(fourBytes)
// 32 byte block hash // 32 byte block hash
buffer.Write(consensus.blockHash) buffer.Write(consensus.blockHash[:])
// 2 byte leader id // 2 byte leader id
twoBytes := make([]byte, 2) twoBytes := make([]byte, 2)
@ -207,10 +191,10 @@ func (consensus Consensus) constructChallengeMessage() []byte {
buffer.Write(twoBytes) buffer.Write(twoBytes)
// 33 byte aggregated commit // 33 byte aggregated commit
buffer.Write(getAggregatedCommit()) buffer.Write(getAggregatedCommit(consensus.commits))
// 33 byte aggregated key // 33 byte aggregated key
buffer.Write(getAggregatedKey()) buffer.Write(getAggregatedKey(consensus.commits))
// 32 byte challenge // 32 byte challenge
buffer.Write(getChallenge()) buffer.Write(getChallenge())
@ -222,18 +206,30 @@ func (consensus Consensus) constructChallengeMessage() []byte {
return consensus.ConstructConsensusMessage(CHALLENGE, buffer.Bytes()) return consensus.ConstructConsensusMessage(CHALLENGE, buffer.Bytes())
} }
// TODO: fill in this function func getAggregatedCommit(commits map[string]string) []byte {
func getAggregatedCommit() []byte { // TODO: implement actual commit aggregation
return make([]byte, 33) var commitArray []string
for _, val := range commits {
commitArray = append(commitArray, val)
}
var commit [32]byte
commit = sha256.Sum256([]byte(strings.Join(commitArray, "")))
return append(commit[:], byte(0))
} }
// TODO: fill in this function func getAggregatedKey(commits map[string]string) []byte {
func getAggregatedKey() []byte { // TODO: implement actual key aggregation
return make([]byte, 33) var commitArray []string
for key := range commits {
commitArray = append(commitArray, key)
}
var commit [32]byte
commit = sha256.Sum256([]byte(strings.Join(commitArray, "")))
return append(commit[:], byte(0))
} }
// TODO: fill in this function
func getChallenge() []byte { func getChallenge() []byte {
// TODO: implement actual challenge data
return make([]byte, 32) return make([]byte, 32)
} }
@ -272,7 +268,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
shouldProcess := !ok && consensus.state == CHALLENGE_DONE shouldProcess := !ok && consensus.state == CHALLENGE_DONE
if shouldProcess { if shouldProcess {
consensus.responses[validatorId] = validatorId consensus.responses[validatorId] = validatorId
log.Printf("Number of responses received: %d", len(consensus.responses)) //consensus.Log.Debug("Number of responses received", "count", len(consensus.responses))
} }
mutex.Unlock() mutex.Unlock()
@ -282,14 +278,28 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
mutex.Lock() mutex.Lock()
if len(consensus.responses) >= (2*len(consensus.validators))/3+1 { if len(consensus.responses) >= (2*len(consensus.validators))/3+1 {
log.Printf("Consensus reached with %d signatures.", len(consensus.responses)) consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(consensus.responses))
if consensus.state == CHALLENGE_DONE { if consensus.state == CHALLENGE_DONE {
// Set state to FINISHED // Set state to FINISHED
consensus.state = FINISHED consensus.state = FINISHED
// TODO: do followups on the consensus // TODO: do followups on the consensus
log.Printf("[Shard %d] HOORAY!!! CONSENSUS REACHED AMONG %d NODES WITH CONSENSUS ID %d!!!\n", consensus.ShardId, len(consensus.validators), consensus.consensusId)
consensus.Log.Debug("HOORAY!!! CONSENSUS REACHED!!!", "numOfNodes", len(consensus.validators))
consensus.ResetState() consensus.ResetState()
// TODO: reconstruct the whole block from header and transactions
// For now, we used the stored whole block in consensus.blockHeader
txDecoder := gob.NewDecoder(bytes.NewReader(consensus.blockHeader))
var blockHeaderObj blockchain.Block
err := txDecoder.Decode(&blockHeaderObj)
if err != nil {
consensus.Log.Debug("failed to construct the new block after consensus")
}
consensus.OnConsensusDone(&blockHeaderObj)
consensus.consensusId++ consensus.consensusId++
// Send signal to Node so the new block can be added and new round of consensus can be triggered
consensus.ReadySignal <- 1 consensus.ReadySignal <- 1
} }
// TODO: composes new block and broadcast the new block to validators // TODO: composes new block and broadcast the new block to validators

@ -6,16 +6,13 @@ import (
) )
func TestConstructAnnounceMessage(test *testing.T) { func TestConstructAnnounceMessage(test *testing.T) {
header := getBlockHeader()
leader := p2p.Peer{Ip: "1", Port: "2"} leader := p2p.Peer{Ip: "1", Port: "2"}
validator := p2p.Peer{Ip: "3", Port: "5"} validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = getBlockHash(make([]byte, 10)) consensus.blockHash = [32]byte{}
msg, err := consensus.constructAnnounceMessage() header := consensus.blockHeader
msg := consensus.constructAnnounceMessage()
if err != nil {
test.Error("Annouce message is not constructed successfully")
}
if len(msg) != 1+1+1+4+32+2+4+64+len(header) { if len(msg) != 1+1+1+4+32+2+4+64+len(header) {
test.Errorf("Annouce message is not constructed in the correct size: %d", len(msg)) test.Errorf("Annouce message is not constructed in the correct size: %d", len(msg))
} }
@ -25,7 +22,7 @@ func TestConstructChallengeMessage(test *testing.T) {
leader := p2p.Peer{Ip: "1", Port: "2"} leader := p2p.Peer{Ip: "1", Port: "2"}
validator := p2p.Peer{Ip: "3", Port: "5"} validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = getBlockHash(make([]byte, 10)) consensus.blockHash = [32]byte{}
msg := consensus.constructChallengeMessage() msg := consensus.constructChallengeMessage()
if len(msg) != 1+1+1+4+32+2+33+33+32+64 { if len(msg) != 1+1+1+4+32+2+33+33+32+64 {

@ -4,33 +4,35 @@ import (
"bytes" "bytes"
"encoding/binary" "encoding/binary"
"harmony-benchmark/p2p" "harmony-benchmark/p2p"
"log" "strconv"
"regexp"
"encoding/gob"
"harmony-benchmark/blockchain"
) )
// Validator's consensus message dispatcher // Validator's consensus message dispatcher
func (consensus *Consensus) ProcessMessageValidator(message []byte) { func (consensus *Consensus) ProcessMessageValidator(message []byte) {
msgType, err := GetConsensusMessageType(message) msgType, err := GetConsensusMessageType(message)
if err != nil { if err != nil {
log.Print(err) consensus.Log.Error("Failed to get consensus message type", "err", err, "consensus", consensus)
} }
payload, err := GetConsensusMessagePayload(message) payload, err := GetConsensusMessagePayload(message)
if err != nil { if err != nil {
log.Print(err) consensus.Log.Error("Failed to get consensus message payload", "err", err, "consensus", consensus)
} }
log.Printf("[Validator] Received and processing message: %s\n", msgType)
switch msgType { switch msgType {
case ANNOUNCE: case ANNOUNCE:
consensus.processAnnounceMessage(payload) consensus.processAnnounceMessage(payload)
case COMMIT: case COMMIT:
log.Printf("Unexpected message type: %s", msgType) consensus.Log.Error("Unexpected message type", "msgType", msgType, "consensus", consensus)
case CHALLENGE: case CHALLENGE:
consensus.processChallengeMessage(payload) consensus.processChallengeMessage(payload)
case RESPONSE: case RESPONSE:
log.Printf("Unexpected message type: %s", msgType) consensus.Log.Error("Unexpected message type", "msgType", msgType, "consensus", consensus)
default: default:
log.Printf("Unexpected message type: %s", msgType) consensus.Log.Error("Unexpected message type", "msgType", msgType, "consensus", consensus)
} }
} }
@ -45,8 +47,8 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) {
blockHash := payload[offset : offset+32] blockHash := payload[offset : offset+32]
offset += 32 offset += 32
// 2 byte validator id // 2 byte leader id
leaderId := string(payload[offset : offset+2]) leaderId := binary.BigEndian.Uint16(payload[offset : offset+2])
offset += 2 offset += 2
// n byte of block header // n byte of block header
@ -65,19 +67,52 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) {
// TODO: make use of the data. This is just to avoid the unused variable warning // TODO: make use of the data. This is just to avoid the unused variable warning
_ = consensusId _ = consensusId
_ = blockHash
_ = leaderId _ = leaderId
_ = blockHeader _ = blockHeader
_ = blockHeaderSize _ = blockHeaderSize
_ = signature _ = signature
consensus.blockHash = blockHash copy(consensus.blockHash[:], blockHash[:])
// verify block data
// Verify block data
// check consensus Id
if consensusId != consensus.consensusId { if consensusId != consensus.consensusId {
log.Printf("Received message with consensus Id: %d. My consensus Id: %d\n", consensusId, consensus.consensusId) consensus.Log.Debug("[ERROR] Received message with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId)
return
}
// check leader Id
leaderPrivKey := consensus.leader.Ip + consensus.leader.Port
reg, _ := regexp.Compile("[^0-9]+")
socketId := reg.ReplaceAllString(leaderPrivKey, "")
value, _ := strconv.Atoi(socketId)
if leaderId != uint16(value) {
consensus.Log.Debug("[ERROR] Received message from wrong leader", "myLeaderId", consensus.consensusId, "receivedLeaderId", consensusId)
return
}
// check block header is valid
txDecoder := gob.NewDecoder(bytes.NewReader(blockHeader))
var blockHeaderObj blockchain.Block // TODO: separate header from block. Right now, this blockHeader data is actually the whole block
err := txDecoder.Decode(&blockHeaderObj)
if err != nil {
consensus.Log.Debug("[ERROR] Unparseable block header data")
return
}
consensus.blockHeader = blockHeader
// check block hash
if bytes.Compare(blockHash[:], blockHeaderObj.HashTransactions()[:]) != 0 || bytes.Compare(blockHeaderObj.Hash[:], blockHeaderObj.HashTransactions()[:]) != 0 {
consensus.Log.Debug("[ERROR] Block hash doesn't match")
return
}
// check block data (transactions
if !consensus.BlockVerifier(&blockHeaderObj) {
consensus.Log.Debug("[ERROR] Block content is not verified successfully")
return return
} }
// sign block
// TODO: return the signature(commit) to leader // TODO: return the signature(commit) to leader
// For now, simply return the private key of this node. // For now, simply return the private key of this node.
@ -98,7 +133,7 @@ func (consensus Consensus) constructCommitMessage() []byte {
buffer.Write(fourBytes) buffer.Write(fourBytes)
// 32 byte block hash // 32 byte block hash
buffer.Write(consensus.blockHash) buffer.Write(consensus.blockHash[:])
// 2 byte validator id // 2 byte validator id
twoBytes := make([]byte, 2) twoBytes := make([]byte, 2)
@ -116,8 +151,8 @@ func (consensus Consensus) constructCommitMessage() []byte {
return consensus.ConstructConsensusMessage(COMMIT, buffer.Bytes()) return consensus.ConstructConsensusMessage(COMMIT, buffer.Bytes())
} }
// TODO: fill in this function
func getCommitMessage() []byte { func getCommitMessage() []byte {
// TODO: use real cosi signature
return make([]byte, 33) return make([]byte, 33)
} }
@ -133,7 +168,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
offset += 32 offset += 32
// 2 byte leader id // 2 byte leader id
leaderId := string(payload[offset : offset+2]) leaderId := binary.BigEndian.Uint16(payload[offset : offset+2])
offset += 2 offset += 2
// 33 byte of aggregated commit // 33 byte of aggregated commit
@ -162,13 +197,30 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
_ = challenge _ = challenge
_ = signature _ = signature
// verify block data and the aggregated signatures // erify block data and the aggregated signatures
// check consensus Id
if consensusId != consensus.consensusId { if consensusId != consensus.consensusId {
log.Printf("Received message with consensus Id: %d. My consensus Id: %d\n", consensusId, consensus.consensusId) consensus.Log.Debug("[ERROR] Received message with wrong consensus Id", "myConsensusId", consensus.consensusId, "theirConsensusId", consensusId)
return return
} }
// sign the message // check leader Id
leaderPrivKey := consensus.leader.Ip + consensus.leader.Port
reg, _ := regexp.Compile("[^0-9]+")
socketId := reg.ReplaceAllString(leaderPrivKey, "")
value, _ := strconv.Atoi(socketId)
if leaderId != uint16(value) {
consensus.Log.Debug("[ERROR] Received message from wrong leader", "myLeaderId", consensus.consensusId, "receivedLeaderId", consensusId)
return
}
// check block hash
if bytes.Compare(blockHash[:], consensus.blockHash[:]) != 0 {
consensus.Log.Debug("[ERROR] Block hash doesn't match")
return
}
// TODO: verify aggregated commits with real schnor cosign verification
// TODO: return the signature(response) to leader // TODO: return the signature(response) to leader
// For now, simply return the private key of this node. // For now, simply return the private key of this node.
@ -178,6 +230,20 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
// Set state to RESPONSE_DONE // Set state to RESPONSE_DONE
consensus.state = RESPONSE_DONE consensus.state = RESPONSE_DONE
consensus.consensusId++ consensus.consensusId++
// TODO: think about when validators know about the consensus is reached.
// For now, the blockchain is updated right here.
// TODO: reconstruct the whole block from header and transactions
// For now, we used the stored whole block in consensus.blockHeader
txDecoder := gob.NewDecoder(bytes.NewReader(consensus.blockHeader))
var blockHeaderObj blockchain.Block
err := txDecoder.Decode(&blockHeaderObj)
if err != nil {
consensus.Log.Debug("failed to construct the new block after consensus")
}
consensus.OnConsensusDone(&blockHeaderObj)
} }
// Construct the response message to send to leader (assumption the consensus data is already verified) // Construct the response message to send to leader (assumption the consensus data is already verified)
@ -190,7 +256,7 @@ func (consensus Consensus) constructResponseMessage() []byte {
buffer.Write(fourBytes) buffer.Write(fourBytes)
// 32 byte block hash // 32 byte block hash
buffer.Write(consensus.blockHash) buffer.Write(consensus.blockHash[:32])
// 2 byte validator id // 2 byte validator id
twoBytes := make([]byte, 2) twoBytes := make([]byte, 2)

@ -9,7 +9,7 @@ func TestConstructCommitMessage(test *testing.T) {
leader := p2p.Peer{Ip: "1", Port: "2"} leader := p2p.Peer{Ip: "1", Port: "2"}
validator := p2p.Peer{Ip: "3", Port: "5"} validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = getBlockHash(make([]byte, 10)) consensus.blockHash = [32]byte{}
msg := consensus.constructCommitMessage() msg := consensus.constructCommitMessage()
if len(msg) != 1+1+1+4+32+2+33+64 { if len(msg) != 1+1+1+4+32+2+33+64 {
@ -21,7 +21,7 @@ func TestConstructResponseMessage(test *testing.T) {
leader := p2p.Peer{Ip: "1", Port: "2"} leader := p2p.Peer{Ip: "1", Port: "2"}
validator := p2p.Peer{Ip: "3", Port: "5"} validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = getBlockHash(make([]byte, 10)) consensus.blockHash = [32]byte{}
msg := consensus.constructResponseMessage() msg := consensus.constructResponseMessage()
if len(msg) != 1+1+1+4+32+2+32+64 { if len(msg) != 1+1+1+4+32+2+32+64 {

Binary file not shown.

@ -0,0 +1,11 @@
Contributors to log15:
- Aaron L
- Alan Shreve
- Chris Hines
- Ciaran Downey
- Dmitry Chestnykh
- Evan Shaw
- Péter Szilágyi
- Trevor Gattis
- Vincent Vanackere

@ -0,0 +1,13 @@
Copyright 2014 Alan Shreve
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

@ -0,0 +1,77 @@
![obligatory xkcd](http://imgs.xkcd.com/comics/standards.png)
# log15 [![godoc reference](https://godoc.org/github.com/inconshreveable/log15?status.png)](https://godoc.org/github.com/inconshreveable/log15) [![Build Status](https://travis-ci.org/inconshreveable/log15.svg?branch=master)](https://travis-ci.org/inconshreveable/log15)
Package log15 provides an opinionated, simple toolkit for best-practice logging in Go (golang) that is both human and machine readable. It is modeled after the Go standard library's [`io`](http://golang.org/pkg/io/) and [`net/http`](http://golang.org/pkg/net/http/) packages and is an alternative to the standard library's [`log`](http://golang.org/pkg/log/) package.
## Features
- A simple, easy-to-understand API
- Promotes structured logging by encouraging use of key/value pairs
- Child loggers which inherit and add their own private context
- Lazy evaluation of expensive operations
- Simple Handler interface allowing for construction of flexible, custom logging configurations with a tiny API.
- Color terminal support
- Built-in support for logging to files, streams, syslog, and the network
- Support for forking records to multiple handlers, buffering records for output, failing over from failed handler writes, + more
## Versioning
The API of the master branch of log15 should always be considered unstable. If you want to rely on a stable API,
you must vendor the library.
## Importing
```go
import log "github.com/inconshreveable/log15"
```
## Examples
```go
// all loggers can have key/value context
srvlog := log.New("module", "app/server")
// all log messages can have key/value context
srvlog.Warn("abnormal conn rate", "rate", curRate, "low", lowRate, "high", highRate)
// child loggers with inherited context
connlog := srvlog.New("raddr", c.RemoteAddr())
connlog.Info("connection open")
// lazy evaluation
connlog.Debug("ping remote", "latency", log.Lazy{pingRemote})
// flexible configuration
srvlog.SetHandler(log.MultiHandler(
log.StreamHandler(os.Stderr, log.LogfmtFormat()),
log.LvlFilterHandler(
log.LvlError,
log.Must.FileHandler("errors.json", log.JSONFormat()))))
```
Will result in output that looks like this:
```
WARN[06-17|21:58:10] abnormal conn rate module=app/server rate=0.500 low=0.100 high=0.800
INFO[06-17|21:58:10] connection open module=app/server raddr=10.0.0.1
```
## Breaking API Changes
The following commits broke API stability. This reference is intended to help you understand the consequences of updating to a newer version
of log15.
- 57a084d014d4150152b19e4e531399a7145d1540 - Added a `Get()` method to the `Logger` interface to retrieve the current handler
- 93404652ee366648fa622b64d1e2b67d75a3094a - `Record` field `Call` changed to `stack.Call` with switch to `github.com/go-stack/stack`
- a5e7613673c73281f58e15a87d2cf0cf111e8152 - Restored `syslog.Priority` argument to the `SyslogXxx` handler constructors
## FAQ
### The varargs style is brittle and error prone! Can I have type safety please?
Yes. Use `log.Ctx`:
```go
srvlog := log.New(log.Ctx{"module": "app/server"})
srvlog.Warn("abnormal conn rate", log.Ctx{"rate": curRate, "low": lowRate, "high": highRate})
```
## License
Apache

@ -0,0 +1,5 @@
This package is a fork of https://github.com/inconshreveable/log15, with some
minor modifications required by the go-ethereum codebase:
* Support for log level `trace`
* Modified behavior to exit on `critical` failure

@ -0,0 +1,334 @@
package log
/*
Package log15 provides an opinionated, simple toolkit for best-practice logging that is
both human and machine readable. It is modeled after the standard library's io and net/http
packages.
This package enforces you to only log key/value pairs. Keys must be strings. Values may be
any type that you like. The default output format is logfmt, but you may also choose to use
JSON instead if that suits you. Here's how you log:
log.Info("page accessed", "path", r.URL.Path, "user_id", user.id)
This will output a line that looks like:
lvl=info t=2014-05-02T16:07:23-0700 msg="page accessed" path=/org/71/profile user_id=9
Getting Started
To get started, you'll want to import the library:
import log "github.com/inconshreveable/log15"
Now you're ready to start logging:
func main() {
log.Info("Program starting", "args", os.Args())
}
Convention
Because recording a human-meaningful message is common and good practice, the first argument to every
logging method is the value to the *implicit* key 'msg'.
Additionally, the level you choose for a message will be automatically added with the key 'lvl', and so
will the current timestamp with key 't'.
You may supply any additional context as a set of key/value pairs to the logging function. log15 allows
you to favor terseness, ordering, and speed over safety. This is a reasonable tradeoff for
logging functions. You don't need to explicitly state keys/values, log15 understands that they alternate
in the variadic argument list:
log.Warn("size out of bounds", "low", lowBound, "high", highBound, "val", val)
If you really do favor your type-safety, you may choose to pass a log.Ctx instead:
log.Warn("size out of bounds", log.Ctx{"low": lowBound, "high": highBound, "val": val})
Context loggers
Frequently, you want to add context to a logger so that you can track actions associated with it. An http
request is a good example. You can easily create new loggers that have context that is automatically included
with each log line:
requestlogger := log.New("path", r.URL.Path)
// later
requestlogger.Debug("db txn commit", "duration", txnTimer.Finish())
This will output a log line that includes the path context that is attached to the logger:
lvl=dbug t=2014-05-02T16:07:23-0700 path=/repo/12/add_hook msg="db txn commit" duration=0.12
Handlers
The Handler interface defines where log lines are printed to and how they are formated. Handler is a
single interface that is inspired by net/http's handler interface:
type Handler interface {
Log(r *Record) error
}
Handlers can filter records, format them, or dispatch to multiple other Handlers.
This package implements a number of Handlers for common logging patterns that are
easily composed to create flexible, custom logging structures.
Here's an example handler that prints logfmt output to Stdout:
handler := log.StreamHandler(os.Stdout, log.LogfmtFormat())
Here's an example handler that defers to two other handlers. One handler only prints records
from the rpc package in logfmt to standard out. The other prints records at Error level
or above in JSON formatted output to the file /var/log/service.json
handler := log.MultiHandler(
log.LvlFilterHandler(log.LvlError, log.Must.FileHandler("/var/log/service.json", log.JSONFormat())),
log.MatchFilterHandler("pkg", "app/rpc" log.StdoutHandler())
)
Logging File Names and Line Numbers
This package implements three Handlers that add debugging information to the
context, CallerFileHandler, CallerFuncHandler and CallerStackHandler. Here's
an example that adds the source file and line number of each logging call to
the context.
h := log.CallerFileHandler(log.StdoutHandler)
log.Root().SetHandler(h)
...
log.Error("open file", "err", err)
This will output a line that looks like:
lvl=eror t=2014-05-02T16:07:23-0700 msg="open file" err="file not found" caller=data.go:42
Here's an example that logs the call stack rather than just the call site.
h := log.CallerStackHandler("%+v", log.StdoutHandler)
log.Root().SetHandler(h)
...
log.Error("open file", "err", err)
This will output a line that looks like:
lvl=eror t=2014-05-02T16:07:23-0700 msg="open file" err="file not found" stack="[pkg/data.go:42 pkg/cmd/main.go]"
The "%+v" format instructs the handler to include the path of the source file
relative to the compile time GOPATH. The github.com/go-stack/stack package
documents the full list of formatting verbs and modifiers available.
Custom Handlers
The Handler interface is so simple that it's also trivial to write your own. Let's create an
example handler which tries to write to one handler, but if that fails it falls back to
writing to another handler and includes the error that it encountered when trying to write
to the primary. This might be useful when trying to log over a network socket, but if that
fails you want to log those records to a file on disk.
type BackupHandler struct {
Primary Handler
Secondary Handler
}
func (h *BackupHandler) Log (r *Record) error {
err := h.Primary.Log(r)
if err != nil {
r.Ctx = append(ctx, "primary_err", err)
return h.Secondary.Log(r)
}
return nil
}
This pattern is so useful that a generic version that handles an arbitrary number of Handlers
is included as part of this library called FailoverHandler.
Logging Expensive Operations
Sometimes, you want to log values that are extremely expensive to compute, but you don't want to pay
the price of computing them if you haven't turned up your logging level to a high level of detail.
This package provides a simple type to annotate a logging operation that you want to be evaluated
lazily, just when it is about to be logged, so that it would not be evaluated if an upstream Handler
filters it out. Just wrap any function which takes no arguments with the log.Lazy type. For example:
func factorRSAKey() (factors []int) {
// return the factors of a very large number
}
log.Debug("factors", log.Lazy{factorRSAKey})
If this message is not logged for any reason (like logging at the Error level), then
factorRSAKey is never evaluated.
Dynamic context values
The same log.Lazy mechanism can be used to attach context to a logger which you want to be
evaluated when the message is logged, but not when the logger is created. For example, let's imagine
a game where you have Player objects:
type Player struct {
name string
alive bool
log.Logger
}
You always want to log a player's name and whether they're alive or dead, so when you create the player
object, you might do:
p := &Player{name: name, alive: true}
p.Logger = log.New("name", p.name, "alive", p.alive)
Only now, even after a player has died, the logger will still report they are alive because the logging
context is evaluated when the logger was created. By using the Lazy wrapper, we can defer the evaluation
of whether the player is alive or not to each log message, so that the log records will reflect the player's
current state no matter when the log message is written:
p := &Player{name: name, alive: true}
isAlive := func() bool { return p.alive }
player.Logger = log.New("name", p.name, "alive", log.Lazy{isAlive})
Terminal Format
If log15 detects that stdout is a terminal, it will configure the default
handler for it (which is log.StdoutHandler) to use TerminalFormat. This format
logs records nicely for your terminal, including color-coded output based
on log level.
Error Handling
Becasuse log15 allows you to step around the type system, there are a few ways you can specify
invalid arguments to the logging functions. You could, for example, wrap something that is not
a zero-argument function with log.Lazy or pass a context key that is not a string. Since logging libraries
are typically the mechanism by which errors are reported, it would be onerous for the logging functions
to return errors. Instead, log15 handles errors by making these guarantees to you:
- Any log record containing an error will still be printed with the error explained to you as part of the log record.
- Any log record containing an error will include the context key LOG15_ERROR, enabling you to easily
(and if you like, automatically) detect if any of your logging calls are passing bad values.
Understanding this, you might wonder why the Handler interface can return an error value in its Log method. Handlers
are encouraged to return errors only if they fail to write their log records out to an external source like if the
syslog daemon is not responding. This allows the construction of useful handlers which cope with those failures
like the FailoverHandler.
Library Use
log15 is intended to be useful for library authors as a way to provide configurable logging to
users of their library. Best practice for use in a library is to always disable all output for your logger
by default and to provide a public Logger instance that consumers of your library can configure. Like so:
package yourlib
import "github.com/inconshreveable/log15"
var Log = log.New()
func init() {
Log.SetHandler(log.DiscardHandler())
}
Users of your library may then enable it if they like:
import "github.com/inconshreveable/log15"
import "example.com/yourlib"
func main() {
handler := // custom handler setup
yourlib.Log.SetHandler(handler)
}
Best practices attaching logger context
The ability to attach context to a logger is a powerful one. Where should you do it and why?
I favor embedding a Logger directly into any persistent object in my application and adding
unique, tracing context keys to it. For instance, imagine I am writing a web browser:
type Tab struct {
url string
render *RenderingContext
// ...
Logger
}
func NewTab(url string) *Tab {
return &Tab {
// ...
url: url,
Logger: log.New("url", url),
}
}
When a new tab is created, I assign a logger to it with the url of
the tab as context so it can easily be traced through the logs.
Now, whenever we perform any operation with the tab, we'll log with its
embedded logger and it will include the tab title automatically:
tab.Debug("moved position", "idx", tab.idx)
There's only one problem. What if the tab url changes? We could
use log.Lazy to make sure the current url is always written, but that
would mean that we couldn't trace a tab's full lifetime through our
logs after the user navigate to a new URL.
Instead, think about what values to attach to your loggers the
same way you think about what to use as a key in a SQL database schema.
If it's possible to use a natural key that is unique for the lifetime of the
object, do so. But otherwise, log15's ext package has a handy RandId
function to let you generate what you might call "surrogate keys"
They're just random hex identifiers to use for tracing. Back to our
Tab example, we would prefer to set up our Logger like so:
import logext "github.com/inconshreveable/log15/ext"
t := &Tab {
// ...
url: url,
}
t.Logger = log.New("id", logext.RandId(8), "url", log.Lazy{t.getUrl})
return t
Now we'll have a unique traceable identifier even across loading new urls, but
we'll still be able to see the tab's current url in the log messages.
Must
For all Handler functions which can return an error, there is a version of that
function which will return no error but panics on failure. They are all available
on the Must object. For example:
log.Must.FileHandler("/path", log.JSONFormat)
log.Must.NetHandler("tcp", ":1234", log.JSONFormat)
Inspiration and Credit
All of the following excellent projects inspired the design of this library:
code.google.com/p/log4go
github.com/op/go-logging
github.com/technoweenie/grohl
github.com/Sirupsen/logrus
github.com/kr/logfmt
github.com/spacemonkeygo/spacelog
golang's stdlib, notably io and net/http
The Name
https://xkcd.com/927/
*/

@ -0,0 +1,364 @@
package log
import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"unicode/utf8"
)
const (
timeFormat = "2006-01-02T15:04:05-0700"
termTimeFormat = "01-02|15:04:05.999999"
floatFormat = 'f'
termMsgJust = 40
)
// locationTrims are trimmed for display to avoid unwieldy log lines.
var locationTrims = []string{
"github.com/ethereum/go-ethereum/",
}
// PrintOrigins sets or unsets log location (file:line) printing for terminal
// format output.
func PrintOrigins(print bool) {
if print {
atomic.StoreUint32(&locationEnabled, 1)
} else {
atomic.StoreUint32(&locationEnabled, 0)
}
}
// locationEnabled is an atomic flag controlling whether the terminal formatter
// should append the log locations too when printing entries.
var locationEnabled uint32
// locationLength is the maxmimum path length encountered, which all logs are
// padded to to aid in alignment.
var locationLength uint32
// fieldPadding is a global map with maximum field value lengths seen until now
// to allow padding log contexts in a bit smarter way.
var fieldPadding = make(map[string]int)
// fieldPaddingLock is a global mutex protecting the field padding map.
var fieldPaddingLock sync.RWMutex
// Format interface.
type Format interface {
Format(r *Record) []byte
}
// FormatFunc returns a new Format object which uses
// the given function to perform record formatting.
func FormatFunc(f func(*Record) []byte) Format {
return formatFunc(f)
}
type formatFunc func(*Record) []byte
func (f formatFunc) Format(r *Record) []byte {
return f(r)
}
// TerminalStringer is an analogous interface to the stdlib stringer, allowing
// own types to have custom shortened serialization formats when printed to the
// screen.
type TerminalStringer interface {
TerminalString() string
}
// TerminalFormat formats log records optimized for human readability on
// a terminal with color-coded level output and terser human friendly timestamp.
// This format should only be used for interactive programs or while developing.
//
// [TIME] [LEVEL] MESAGE key=value key=value ...
//
// Example:
//
// [May 16 20:58:45] [DBUG] remove route ns=haproxy addr=127.0.0.1:50002
//
func TerminalFormat(usecolor bool) Format {
return FormatFunc(func(r *Record) []byte {
var color = 0
if usecolor {
switch r.Lvl {
case LvlCrit:
color = 35
case LvlError:
color = 31
case LvlWarn:
color = 33
case LvlInfo:
color = 32
case LvlDebug:
color = 36
case LvlTrace:
color = 34
}
}
b := &bytes.Buffer{}
lvl := r.Lvl.AlignedString()
if atomic.LoadUint32(&locationEnabled) != 0 {
// Log origin printing was requested, format the location path and line number
location := fmt.Sprintf("%+v", r.Call)
for _, prefix := range locationTrims {
location = strings.TrimPrefix(location, prefix)
}
// Maintain the maximum location length for fancyer alignment
align := int(atomic.LoadUint32(&locationLength))
if align < len(location) {
align = len(location)
atomic.StoreUint32(&locationLength, uint32(align))
}
padding := strings.Repeat(" ", align-len(location))
// Assemble and print the log heading
if color > 0 {
fmt.Fprintf(b, "\x1b[%dm%s\x1b[0m[%s|%s]%s %s ", color, lvl, r.Time.Format(termTimeFormat), location, padding, r.Msg)
} else {
fmt.Fprintf(b, "%s[%s|%s]%s %s ", lvl, r.Time.Format(termTimeFormat), location, padding, r.Msg)
}
} else {
if color > 0 {
fmt.Fprintf(b, "\x1b[%dm%s\x1b[0m[%s] %s ", color, lvl, r.Time.Format(termTimeFormat), r.Msg)
} else {
fmt.Fprintf(b, "%s[%s] %s ", lvl, r.Time.Format(termTimeFormat), r.Msg)
}
}
// try to justify the log output for short messages
length := utf8.RuneCountInString(r.Msg)
if len(r.Ctx) > 0 && length < termMsgJust {
b.Write(bytes.Repeat([]byte{' '}, termMsgJust-length))
}
// print the keys logfmt style
logfmt(b, r.Ctx, color, true)
return b.Bytes()
})
}
// LogfmtFormat prints records in logfmt format, an easy machine-parseable but human-readable
// format for key/value pairs.
//
// For more details see: http://godoc.org/github.com/kr/logfmt
//
func LogfmtFormat() Format {
return FormatFunc(func(r *Record) []byte {
common := []interface{}{r.KeyNames.Time, r.Time, r.KeyNames.Lvl, r.Lvl, r.KeyNames.Msg, r.Msg}
buf := &bytes.Buffer{}
logfmt(buf, append(common, r.Ctx...), 0, false)
return buf.Bytes()
})
}
func logfmt(buf *bytes.Buffer, ctx []interface{}, color int, term bool) {
for i := 0; i < len(ctx); i += 2 {
if i != 0 {
buf.WriteByte(' ')
}
k, ok := ctx[i].(string)
v := formatLogfmtValue(ctx[i+1], term)
if !ok {
k, v = errorKey, formatLogfmtValue(k, term)
}
// XXX: we should probably check that all of your key bytes aren't invalid
fieldPaddingLock.RLock()
padding := fieldPadding[k]
fieldPaddingLock.RUnlock()
length := utf8.RuneCountInString(v)
if padding < length {
padding = length
fieldPaddingLock.Lock()
fieldPadding[k] = padding
fieldPaddingLock.Unlock()
}
if color > 0 {
fmt.Fprintf(buf, "\x1b[%dm%s\x1b[0m=", color, k)
} else {
buf.WriteString(k)
buf.WriteByte('=')
}
buf.WriteString(v)
if i < len(ctx)-2 {
buf.Write(bytes.Repeat([]byte{' '}, padding-length))
}
}
buf.WriteByte('\n')
}
// JSONFormat formats log records as JSON objects separated by newlines.
// It is the equivalent of JSONFormatEx(false, true).
func JSONFormat() Format {
return JSONFormatEx(false, true)
}
// JSONFormatEx formats log records as JSON objects. If pretty is true,
// records will be pretty-printed. If lineSeparated is true, records
// will be logged with a new line between each record.
func JSONFormatEx(pretty, lineSeparated bool) Format {
jsonMarshal := json.Marshal
if pretty {
jsonMarshal = func(v interface{}) ([]byte, error) {
return json.MarshalIndent(v, "", " ")
}
}
return FormatFunc(func(r *Record) []byte {
props := make(map[string]interface{})
props[r.KeyNames.Time] = r.Time
props[r.KeyNames.Lvl] = r.Lvl.String()
props[r.KeyNames.Msg] = r.Msg
for i := 0; i < len(r.Ctx); i += 2 {
k, ok := r.Ctx[i].(string)
if !ok {
props[errorKey] = fmt.Sprintf("%+v is not a string key", r.Ctx[i])
}
props[k] = formatJSONValue(r.Ctx[i+1])
}
b, err := jsonMarshal(props)
if err != nil {
b, _ = jsonMarshal(map[string]string{
errorKey: err.Error(),
})
return b
}
if lineSeparated {
b = append(b, '\n')
}
return b
})
}
func formatShared(value interface{}) (result interface{}) {
defer func() {
if err := recover(); err != nil {
if v := reflect.ValueOf(value); v.Kind() == reflect.Ptr && v.IsNil() {
result = "nil"
} else {
panic(err)
}
}
}()
switch v := value.(type) {
case time.Time:
return v.Format(timeFormat)
case error:
return v.Error()
case fmt.Stringer:
return v.String()
default:
return v
}
}
func formatJSONValue(value interface{}) interface{} {
value = formatShared(value)
switch value.(type) {
case int, int8, int16, int32, int64, float32, float64, uint, uint8, uint16, uint32, uint64, string:
return value
default:
return fmt.Sprintf("%+v", value)
}
}
// formatValue formats a value for serialization
func formatLogfmtValue(value interface{}, term bool) string {
if value == nil {
return "nil"
}
if t, ok := value.(time.Time); ok {
// Performance optimization: No need for escaping since the provided
// timeFormat doesn't have any escape characters, and escaping is
// expensive.
return t.Format(timeFormat)
}
if term {
if s, ok := value.(TerminalStringer); ok {
// Custom terminal stringer provided, use that
return escapeString(s.TerminalString())
}
}
value = formatShared(value)
switch v := value.(type) {
case bool:
return strconv.FormatBool(v)
case float32:
return strconv.FormatFloat(float64(v), floatFormat, 3, 64)
case float64:
return strconv.FormatFloat(v, floatFormat, 3, 64)
case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64:
return fmt.Sprintf("%d", value)
case string:
return escapeString(v)
default:
return escapeString(fmt.Sprintf("%+v", value))
}
}
var stringBufPool = sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
}
func escapeString(s string) string {
needsQuotes := false
needsEscape := false
for _, r := range s {
if r <= ' ' || r == '=' || r == '"' {
needsQuotes = true
}
if r == '\\' || r == '"' || r == '\n' || r == '\r' || r == '\t' {
needsEscape = true
}
}
if !needsEscape && !needsQuotes {
return s
}
e := stringBufPool.Get().(*bytes.Buffer)
e.WriteByte('"')
for _, r := range s {
switch r {
case '\\', '"':
e.WriteByte('\\')
e.WriteByte(byte(r))
case '\n':
e.WriteString("\\n")
case '\r':
e.WriteString("\\r")
case '\t':
e.WriteString("\\t")
default:
e.WriteRune(r)
}
}
e.WriteByte('"')
var ret string
if needsQuotes {
ret = e.String()
} else {
ret = string(e.Bytes()[1 : e.Len()-1])
}
e.Reset()
stringBufPool.Put(e)
return ret
}

@ -0,0 +1,359 @@
package log
import (
"fmt"
"io"
"net"
"os"
"reflect"
"sync"
"github.com/go-stack/stack"
)
// Handler defines where and how log records are written.
// A Logger prints its log records by writing to a Handler.
// Handlers are composable, providing you great flexibility in combining
// them to achieve the logging structure that suits your applications.
type Handler interface {
Log(r *Record) error
}
// FuncHandler returns a Handler that logs records with the given
// function.
func FuncHandler(fn func(r *Record) error) Handler {
return funcHandler(fn)
}
type funcHandler func(r *Record) error
func (h funcHandler) Log(r *Record) error {
return h(r)
}
// StreamHandler writes log records to an io.Writer
// with the given format. StreamHandler can be used
// to easily begin writing log records to other
// outputs.
//
// StreamHandler wraps itself with LazyHandler and SyncHandler
// to evaluate Lazy objects and perform safe concurrent writes.
func StreamHandler(wr io.Writer, fmtr Format) Handler {
h := FuncHandler(func(r *Record) error {
_, err := wr.Write(fmtr.Format(r))
return err
})
return LazyHandler(SyncHandler(h))
}
// SyncHandler can be wrapped around a handler to guarantee that
// only a single Log operation can proceed at a time. It's necessary
// for thread-safe concurrent writes.
func SyncHandler(h Handler) Handler {
var mu sync.Mutex
return FuncHandler(func(r *Record) error {
defer mu.Unlock()
mu.Lock()
return h.Log(r)
})
}
// FileHandler returns a handler which writes log records to the give file
// using the given format. If the path
// already exists, FileHandler will append to the given file. If it does not,
// FileHandler will create the file with mode 0644.
func FileHandler(path string, fmtr Format) (Handler, error) {
f, err := os.OpenFile(path, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}
return closingHandler{f, StreamHandler(f, fmtr)}, nil
}
// NetHandler opens a socket to the given address and writes records
// over the connection.
func NetHandler(network, addr string, fmtr Format) (Handler, error) {
conn, err := net.Dial(network, addr)
if err != nil {
return nil, err
}
return closingHandler{conn, StreamHandler(conn, fmtr)}, nil
}
// XXX: closingHandler is essentially unused at the moment
// it's meant for a future time when the Handler interface supports
// a possible Close() operation
type closingHandler struct {
io.WriteCloser
Handler
}
func (h *closingHandler) Close() error {
return h.WriteCloser.Close()
}
// CallerFileHandler returns a Handler that adds the line number and file of
// the calling function to the context with key "caller".
func CallerFileHandler(h Handler) Handler {
return FuncHandler(func(r *Record) error {
r.Ctx = append(r.Ctx, "caller", fmt.Sprint(r.Call))
return h.Log(r)
})
}
// CallerFuncHandler returns a Handler that adds the calling function name to
// the context with key "fn".
func CallerFuncHandler(h Handler) Handler {
return FuncHandler(func(r *Record) error {
r.Ctx = append(r.Ctx, "fn", formatCall("%+n", r.Call))
return h.Log(r)
})
}
// This function is here to please go vet on Go < 1.8.
func formatCall(format string, c stack.Call) string {
return fmt.Sprintf(format, c)
}
// CallerStackHandler returns a Handler that adds a stack trace to the context
// with key "stack". The stack trace is formated as a space separated list of
// call sites inside matching []'s. The most recent call site is listed first.
// Each call site is formatted according to format. See the documentation of
// package github.com/go-stack/stack for the list of supported formats.
func CallerStackHandler(format string, h Handler) Handler {
return FuncHandler(func(r *Record) error {
s := stack.Trace().TrimBelow(r.Call).TrimRuntime()
if len(s) > 0 {
r.Ctx = append(r.Ctx, "stack", fmt.Sprintf(format, s))
}
return h.Log(r)
})
}
// FilterHandler returns a Handler that only writes records to the
// wrapped Handler if the given function evaluates true. For example,
// to only log records where the 'err' key is not nil:
//
// logger.SetHandler(FilterHandler(func(r *Record) bool {
// for i := 0; i < len(r.Ctx); i += 2 {
// if r.Ctx[i] == "err" {
// return r.Ctx[i+1] != nil
// }
// }
// return false
// }, h))
//
func FilterHandler(fn func(r *Record) bool, h Handler) Handler {
return FuncHandler(func(r *Record) error {
if fn(r) {
return h.Log(r)
}
return nil
})
}
// MatchFilterHandler returns a Handler that only writes records
// to the wrapped Handler if the given key in the logged
// context matches the value. For example, to only log records
// from your ui package:
//
// log.MatchFilterHandler("pkg", "app/ui", log.StdoutHandler)
//
func MatchFilterHandler(key string, value interface{}, h Handler) Handler {
return FilterHandler(func(r *Record) (pass bool) {
switch key {
case r.KeyNames.Lvl:
return r.Lvl == value
case r.KeyNames.Time:
return r.Time == value
case r.KeyNames.Msg:
return r.Msg == value
}
for i := 0; i < len(r.Ctx); i += 2 {
if r.Ctx[i] == key {
return r.Ctx[i+1] == value
}
}
return false
}, h)
}
// LvlFilterHandler returns a Handler that only writes
// records which are less than the given verbosity
// level to the wrapped Handler. For example, to only
// log Error/Crit records:
//
// log.LvlFilterHandler(log.LvlError, log.StdoutHandler)
//
func LvlFilterHandler(maxLvl Lvl, h Handler) Handler {
return FilterHandler(func(r *Record) (pass bool) {
return r.Lvl <= maxLvl
}, h)
}
// MultiHandler dispatches any write to each of its handlers.
// This is useful for writing different types of log information
// to different locations. For example, to log to a file and
// standard error:
//
// log.MultiHandler(
// log.Must.FileHandler("/var/log/app.log", log.LogfmtFormat()),
// log.StderrHandler)
//
func MultiHandler(hs ...Handler) Handler {
return FuncHandler(func(r *Record) error {
for _, h := range hs {
// what to do about failures?
h.Log(r)
}
return nil
})
}
// FailoverHandler writes all log records to the first handler
// specified, but will failover and write to the second handler if
// the first handler has failed, and so on for all handlers specified.
// For example you might want to log to a network socket, but failover
// to writing to a file if the network fails, and then to
// standard out if the file write fails:
//
// log.FailoverHandler(
// log.Must.NetHandler("tcp", ":9090", log.JSONFormat()),
// log.Must.FileHandler("/var/log/app.log", log.LogfmtFormat()),
// log.StdoutHandler)
//
// All writes that do not go to the first handler will add context with keys of
// the form "failover_err_{idx}" which explain the error encountered while
// trying to write to the handlers before them in the list.
func FailoverHandler(hs ...Handler) Handler {
return FuncHandler(func(r *Record) error {
var err error
for i, h := range hs {
err = h.Log(r)
if err == nil {
return nil
}
r.Ctx = append(r.Ctx, fmt.Sprintf("failover_err_%d", i), err)
}
return err
})
}
// ChannelHandler writes all records to the given channel.
// It blocks if the channel is full. Useful for async processing
// of log messages, it's used by BufferedHandler.
func ChannelHandler(recs chan<- *Record) Handler {
return FuncHandler(func(r *Record) error {
recs <- r
return nil
})
}
// BufferedHandler writes all records to a buffered
// channel of the given size which flushes into the wrapped
// handler whenever it is available for writing. Since these
// writes happen asynchronously, all writes to a BufferedHandler
// never return an error and any errors from the wrapped handler are ignored.
func BufferedHandler(bufSize int, h Handler) Handler {
recs := make(chan *Record, bufSize)
go func() {
for m := range recs {
_ = h.Log(m)
}
}()
return ChannelHandler(recs)
}
// LazyHandler writes all values to the wrapped handler after evaluating
// any lazy functions in the record's context. It is already wrapped
// around StreamHandler and SyslogHandler in this library, you'll only need
// it if you write your own Handler.
func LazyHandler(h Handler) Handler {
return FuncHandler(func(r *Record) error {
// go through the values (odd indices) and reassign
// the values of any lazy fn to the result of its execution
hadErr := false
for i := 1; i < len(r.Ctx); i += 2 {
lz, ok := r.Ctx[i].(Lazy)
if ok {
v, err := evaluateLazy(lz)
if err != nil {
hadErr = true
r.Ctx[i] = err
} else {
if cs, ok := v.(stack.CallStack); ok {
v = cs.TrimBelow(r.Call).TrimRuntime()
}
r.Ctx[i] = v
}
}
}
if hadErr {
r.Ctx = append(r.Ctx, errorKey, "bad lazy")
}
return h.Log(r)
})
}
func evaluateLazy(lz Lazy) (interface{}, error) {
t := reflect.TypeOf(lz.Fn)
if t.Kind() != reflect.Func {
return nil, fmt.Errorf("INVALID_LAZY, not func: %+v", lz.Fn)
}
if t.NumIn() > 0 {
return nil, fmt.Errorf("INVALID_LAZY, func takes args: %+v", lz.Fn)
}
if t.NumOut() == 0 {
return nil, fmt.Errorf("INVALID_LAZY, no func return val: %+v", lz.Fn)
}
value := reflect.ValueOf(lz.Fn)
results := value.Call([]reflect.Value{})
if len(results) == 1 {
return results[0].Interface(), nil
}
values := make([]interface{}, len(results))
for i, v := range results {
values[i] = v.Interface()
}
return values, nil
}
// DiscardHandler reports success for all writes but does nothing.
// It is useful for dynamically disabling logging at runtime via
// a Logger's SetHandler method.
func DiscardHandler() Handler {
return FuncHandler(func(r *Record) error {
return nil
})
}
// Must provides the following Handler creation functions
// which instead of returning an error parameter only return a Handler
// and panic on failure: FileHandler, NetHandler, SyslogHandler, SyslogNetHandler
var Must muster
func must(h Handler, err error) Handler {
if err != nil {
panic(err)
}
return h
}
type muster struct{}
func (m muster) FileHandler(path string, fmtr Format) Handler {
return must(FileHandler(path, fmtr))
}
func (m muster) NetHandler(network, addr string, fmtr Format) Handler {
return must(NetHandler(network, addr, fmtr))
}

@ -0,0 +1,227 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package log
import (
"errors"
"fmt"
"regexp"
"runtime"
"strconv"
"strings"
"sync"
"sync/atomic"
)
// errVmoduleSyntax is returned when a user vmodule pattern is invalid.
var errVmoduleSyntax = errors.New("expect comma-separated list of filename=N")
// errTraceSyntax is returned when a user backtrace pattern is invalid.
var errTraceSyntax = errors.New("expect file.go:234")
// GlogHandler is a log handler that mimics the filtering features of Google's
// glog logger: setting global log levels; overriding with callsite pattern
// matches; and requesting backtraces at certain positions.
type GlogHandler struct {
origin Handler // The origin handler this wraps
level uint32 // Current log level, atomically accessible
override uint32 // Flag whether overrides are used, atomically accessible
backtrace uint32 // Flag whether backtrace location is set
patterns []pattern // Current list of patterns to override with
siteCache map[uintptr]Lvl // Cache of callsite pattern evaluations
location string // file:line location where to do a stackdump at
lock sync.RWMutex // Lock protecting the override pattern list
}
// NewGlogHandler creates a new log handler with filtering functionality similar
// to Google's glog logger. The returned handler implements Handler.
func NewGlogHandler(h Handler) *GlogHandler {
return &GlogHandler{
origin: h,
}
}
// pattern contains a filter for the Vmodule option, holding a verbosity level
// and a file pattern to match.
type pattern struct {
pattern *regexp.Regexp
level Lvl
}
// Verbosity sets the glog verbosity ceiling. The verbosity of individual packages
// and source files can be raised using Vmodule.
func (h *GlogHandler) Verbosity(level Lvl) {
atomic.StoreUint32(&h.level, uint32(level))
}
// Vmodule sets the glog verbosity pattern.
//
// The syntax of the argument is a comma-separated list of pattern=N, where the
// pattern is a literal file name or "glob" pattern matching and N is a V level.
//
// For instance:
//
// pattern="gopher.go=3"
// sets the V level to 3 in all Go files named "gopher.go"
//
// pattern="foo=3"
// sets V to 3 in all files of any packages whose import path ends in "foo"
//
// pattern="foo/*=3"
// sets V to 3 in all files of any packages whose import path contains "foo"
func (h *GlogHandler) Vmodule(ruleset string) error {
var filter []pattern
for _, rule := range strings.Split(ruleset, ",") {
// Empty strings such as from a trailing comma can be ignored
if len(rule) == 0 {
continue
}
// Ensure we have a pattern = level filter rule
parts := strings.Split(rule, "=")
if len(parts) != 2 {
return errVmoduleSyntax
}
parts[0] = strings.TrimSpace(parts[0])
parts[1] = strings.TrimSpace(parts[1])
if len(parts[0]) == 0 || len(parts[1]) == 0 {
return errVmoduleSyntax
}
// Parse the level and if correct, assemble the filter rule
level, err := strconv.Atoi(parts[1])
if err != nil {
return errVmoduleSyntax
}
if level <= 0 {
continue // Ignore. It's harmless but no point in paying the overhead.
}
// Compile the rule pattern into a regular expression
matcher := ".*"
for _, comp := range strings.Split(parts[0], "/") {
if comp == "*" {
matcher += "(/.*)?"
} else if comp != "" {
matcher += "/" + regexp.QuoteMeta(comp)
}
}
if !strings.HasSuffix(parts[0], ".go") {
matcher += "/[^/]+\\.go"
}
matcher = matcher + "$"
re, _ := regexp.Compile(matcher)
filter = append(filter, pattern{re, Lvl(level)})
}
// Swap out the vmodule pattern for the new filter system
h.lock.Lock()
defer h.lock.Unlock()
h.patterns = filter
h.siteCache = make(map[uintptr]Lvl)
atomic.StoreUint32(&h.override, uint32(len(filter)))
return nil
}
// BacktraceAt sets the glog backtrace location. When set to a file and line
// number holding a logging statement, a stack trace will be written to the Info
// log whenever execution hits that statement.
//
// Unlike with Vmodule, the ".go" must be present.
func (h *GlogHandler) BacktraceAt(location string) error {
// Ensure the backtrace location contains two non-empty elements
parts := strings.Split(location, ":")
if len(parts) != 2 {
return errTraceSyntax
}
parts[0] = strings.TrimSpace(parts[0])
parts[1] = strings.TrimSpace(parts[1])
if len(parts[0]) == 0 || len(parts[1]) == 0 {
return errTraceSyntax
}
// Ensure the .go prefix is present and the line is valid
if !strings.HasSuffix(parts[0], ".go") {
return errTraceSyntax
}
if _, err := strconv.Atoi(parts[1]); err != nil {
return errTraceSyntax
}
// All seems valid
h.lock.Lock()
defer h.lock.Unlock()
h.location = location
atomic.StoreUint32(&h.backtrace, uint32(len(location)))
return nil
}
// Log implements Handler.Log, filtering a log record through the global, local
// and backtrace filters, finally emitting it if either allow it through.
func (h *GlogHandler) Log(r *Record) error {
// If backtracing is requested, check whether this is the callsite
if atomic.LoadUint32(&h.backtrace) > 0 {
// Everything below here is slow. Although we could cache the call sites the
// same way as for vmodule, backtracing is so rare it's not worth the extra
// complexity.
h.lock.RLock()
match := h.location == r.Call.String()
h.lock.RUnlock()
if match {
// Callsite matched, raise the log level to info and gather the stacks
r.Lvl = LvlInfo
buf := make([]byte, 1024*1024)
buf = buf[:runtime.Stack(buf, true)]
r.Msg += "\n\n" + string(buf)
}
}
// If the global log level allows, fast track logging
if atomic.LoadUint32(&h.level) >= uint32(r.Lvl) {
return h.origin.Log(r)
}
// If no local overrides are present, fast track skipping
if atomic.LoadUint32(&h.override) == 0 {
return nil
}
// Check callsite cache for previously calculated log levels
h.lock.RLock()
lvl, ok := h.siteCache[r.Call.PC()]
h.lock.RUnlock()
// If we didn't cache the callsite yet, calculate it
if !ok {
h.lock.Lock()
for _, rule := range h.patterns {
if rule.pattern.MatchString(fmt.Sprintf("%+s", r.Call)) {
h.siteCache[r.Call.PC()], lvl, ok = rule.level, rule.level, true
break
}
}
// If no rule matched, remember to drop log the next time
if !ok {
h.siteCache[r.Call.PC()] = 0
}
h.lock.Unlock()
}
if lvl >= r.Lvl {
return h.origin.Log(r)
}
return nil
}

@ -0,0 +1,26 @@
// +build !go1.4
package log
import (
"sync/atomic"
"unsafe"
)
// swapHandler wraps another handler that may be swapped out
// dynamically at runtime in a thread-safe fashion.
type swapHandler struct {
handler unsafe.Pointer
}
func (h *swapHandler) Log(r *Record) error {
return h.Get().Log(r)
}
func (h *swapHandler) Get() Handler {
return *(*Handler)(atomic.LoadPointer(&h.handler))
}
func (h *swapHandler) Swap(newHandler Handler) {
atomic.StorePointer(&h.handler, unsafe.Pointer(&newHandler))
}

@ -0,0 +1,23 @@
// +build go1.4
package log
import "sync/atomic"
// swapHandler wraps another handler that may be swapped out
// dynamically at runtime in a thread-safe fashion.
type swapHandler struct {
handler atomic.Value
}
func (h *swapHandler) Log(r *Record) error {
return (*h.handler.Load().(*Handler)).Log(r)
}
func (h *swapHandler) Swap(newHandler Handler) {
h.handler.Store(&newHandler)
}
func (h *swapHandler) Get() Handler {
return *h.handler.Load().(*Handler)
}

@ -0,0 +1,244 @@
package log
import (
"fmt"
"os"
"time"
"github.com/go-stack/stack"
)
const timeKey = "t"
const lvlKey = "lvl"
const msgKey = "msg"
const errorKey = "LOG15_ERROR"
const skipLevel = 2
// Lvl type.
type Lvl int
// Constants.
const (
LvlCrit Lvl = iota
LvlError
LvlWarn
LvlInfo
LvlDebug
LvlTrace
)
// AlignedString returns a 5-character string containing the name of a Lvl.
func (l Lvl) AlignedString() string {
switch l {
case LvlTrace:
return "TRACE"
case LvlDebug:
return "DEBUG"
case LvlInfo:
return "INFO "
case LvlWarn:
return "WARN "
case LvlError:
return "ERROR"
case LvlCrit:
return "CRIT "
default:
panic("bad level")
}
}
// Strings returns the name of a Lvl.
func (l Lvl) String() string {
switch l {
case LvlTrace:
return "trce"
case LvlDebug:
return "dbug"
case LvlInfo:
return "info"
case LvlWarn:
return "warn"
case LvlError:
return "eror"
case LvlCrit:
return "crit"
default:
panic("bad level")
}
}
// LvlFromString returns the appropriate Lvl from a string name.
// Useful for parsing command line args and configuration files.
func LvlFromString(lvlString string) (Lvl, error) {
switch lvlString {
case "trace", "trce":
return LvlTrace, nil
case "debug", "dbug":
return LvlDebug, nil
case "info":
return LvlInfo, nil
case "warn":
return LvlWarn, nil
case "error", "eror":
return LvlError, nil
case "crit":
return LvlCrit, nil
default:
return LvlDebug, fmt.Errorf("Unknown level: %v", lvlString)
}
}
// A Record is what a Logger asks its handler to write
type Record struct {
Time time.Time
Lvl Lvl
Msg string
Ctx []interface{}
Call stack.Call
KeyNames RecordKeyNames
}
// RecordKeyNames gets stored in a Record when the write function is executed.
type RecordKeyNames struct {
Time string
Msg string
Lvl string
}
// A Logger writes key/value pairs to a Handler
type Logger interface {
// New returns a new Logger that has this logger's context plus the given context
New(ctx ...interface{}) Logger
// GetHandler gets the handler associated with the logger.
GetHandler() Handler
// SetHandler updates the logger to write records to the specified handler.
SetHandler(h Handler)
// Log a message at the given level with context key/value pairs
Trace(msg string, ctx ...interface{})
Debug(msg string, ctx ...interface{})
Info(msg string, ctx ...interface{})
Warn(msg string, ctx ...interface{})
Error(msg string, ctx ...interface{})
Crit(msg string, ctx ...interface{})
}
type logger struct {
ctx []interface{}
h *swapHandler
}
func (l *logger) write(msg string, lvl Lvl, ctx []interface{}, skip int) {
l.h.Log(&Record{
Time: time.Now(),
Lvl: lvl,
Msg: msg,
Ctx: newContext(l.ctx, ctx),
Call: stack.Caller(skip),
KeyNames: RecordKeyNames{
Time: timeKey,
Msg: msgKey,
Lvl: lvlKey,
},
})
}
func (l *logger) New(ctx ...interface{}) Logger {
child := &logger{newContext(l.ctx, ctx), new(swapHandler)}
child.SetHandler(l.h)
return child
}
func newContext(prefix []interface{}, suffix []interface{}) []interface{} {
normalizedSuffix := normalize(suffix)
newCtx := make([]interface{}, len(prefix)+len(normalizedSuffix))
n := copy(newCtx, prefix)
copy(newCtx[n:], normalizedSuffix)
return newCtx
}
func (l *logger) Trace(msg string, ctx ...interface{}) {
l.write(msg, LvlTrace, ctx, skipLevel)
}
func (l *logger) Debug(msg string, ctx ...interface{}) {
l.write(msg, LvlDebug, ctx, skipLevel)
}
func (l *logger) Info(msg string, ctx ...interface{}) {
l.write(msg, LvlInfo, ctx, skipLevel)
}
func (l *logger) Warn(msg string, ctx ...interface{}) {
l.write(msg, LvlWarn, ctx, skipLevel)
}
func (l *logger) Error(msg string, ctx ...interface{}) {
l.write(msg, LvlError, ctx, skipLevel)
}
func (l *logger) Crit(msg string, ctx ...interface{}) {
l.write(msg, LvlCrit, ctx, skipLevel)
os.Exit(1)
}
func (l *logger) GetHandler() Handler {
return l.h.Get()
}
func (l *logger) SetHandler(h Handler) {
l.h.Swap(h)
}
func normalize(ctx []interface{}) []interface{} {
// if the caller passed a Ctx object, then expand it
if len(ctx) == 1 {
if ctxMap, ok := ctx[0].(Ctx); ok {
ctx = ctxMap.toArray()
}
}
// ctx needs to be even because it's a series of key/value pairs
// no one wants to check for errors on logging functions,
// so instead of erroring on bad input, we'll just make sure
// that things are the right length and users can fix bugs
// when they see the output looks wrong
if len(ctx)%2 != 0 {
ctx = append(ctx, nil, errorKey, "Normalized odd number of arguments by adding nil")
}
return ctx
}
// Lazy allows you to defer calculation of a logged value that is expensive
// to compute until it is certain that it must be evaluated with the given filters.
//
// Lazy may also be used in conjunction with a Logger's New() function
// to generate a child logger which always reports the current value of changing
// state.
//
// You may wrap any function which takes no arguments to Lazy. It may return any
// number of values of any type.
type Lazy struct {
Fn interface{}
}
// Ctx is a map of key/value pairs to pass as context to a log function
// Use this only if you really need greater safety around the arguments you pass
// to the logging functions.
type Ctx map[string]interface{}
func (c Ctx) toArray() []interface{} {
arr := make([]interface{}, len(c)*2)
i := 0
for k, v := range c {
arr[i] = k
arr[i+1] = v
i += 2
}
return arr
}

@ -0,0 +1,72 @@
package log
import (
"os"
)
var (
root = &logger{[]interface{}{}, new(swapHandler)}
// StdoutHandler handles stdout
StdoutHandler = StreamHandler(os.Stdout, LogfmtFormat())
// StderrHandler handles stderr
StderrHandler = StreamHandler(os.Stderr, LogfmtFormat())
)
func init() {
root.SetHandler(DiscardHandler())
}
// New returns a new logger with the given context.
// New is a convenient alias for Root().New
func New(ctx ...interface{}) Logger {
return root.New(ctx...)
}
// Root returns the root logger
func Root() Logger {
return root
}
// The following functions bypass the exported logger methods (logger.Debug,
// etc.) to keep the call depth the same for all paths to logger.write so
// runtime.Caller(2) always refers to the call site in client code.
// Trace is a convenient alias for Root().Trace
func Trace(msg string, ctx ...interface{}) {
root.write(msg, LvlTrace, ctx, skipLevel)
}
// Debug is a convenient alias for Root().Debug
func Debug(msg string, ctx ...interface{}) {
root.write(msg, LvlDebug, ctx, skipLevel)
}
// Info is a convenient alias for Root().Info
func Info(msg string, ctx ...interface{}) {
root.write(msg, LvlInfo, ctx, skipLevel)
}
// Warn is a convenient alias for Root().Warn
func Warn(msg string, ctx ...interface{}) {
root.write(msg, LvlWarn, ctx, skipLevel)
}
// Error is a convenient alias for Root().Error
func Error(msg string, ctx ...interface{}) {
root.write(msg, LvlError, ctx, skipLevel)
}
// Crit is a convenient alias for Root().Crit
func Crit(msg string, ctx ...interface{}) {
root.write(msg, LvlCrit, ctx, skipLevel)
os.Exit(1)
}
// Output is a convenient alias for write, allowing for the modification of
// the calldepth (number of stack frames to skip).
// calldepth influences the reported line number of the log message.
// A calldepth of zero reports the immediate caller of Output.
// Non-zero calldepth skips as many stack frames.
func Output(msg string, lvl Lvl, calldepth int, ctx ...interface{}) {
root.write(msg, lvl, ctx, calldepth+skipLevel)
}

@ -0,0 +1,57 @@
// +build !windows,!plan9
package log
import (
"log/syslog"
"strings"
)
// SyslogHandler opens a connection to the system syslog daemon by calling
// syslog.New and writes all records to it.
func SyslogHandler(priority syslog.Priority, tag string, fmtr Format) (Handler, error) {
wr, err := syslog.New(priority, tag)
return sharedSyslog(fmtr, wr, err)
}
// SyslogNetHandler opens a connection to a log daemon over the network and writes
// all log records to it.
func SyslogNetHandler(net, addr string, priority syslog.Priority, tag string, fmtr Format) (Handler, error) {
wr, err := syslog.Dial(net, addr, priority, tag)
return sharedSyslog(fmtr, wr, err)
}
func sharedSyslog(fmtr Format, sysWr *syslog.Writer, err error) (Handler, error) {
if err != nil {
return nil, err
}
h := FuncHandler(func(r *Record) error {
var syslogFn = sysWr.Info
switch r.Lvl {
case LvlCrit:
syslogFn = sysWr.Crit
case LvlError:
syslogFn = sysWr.Err
case LvlWarn:
syslogFn = sysWr.Warning
case LvlInfo:
syslogFn = sysWr.Info
case LvlDebug:
syslogFn = sysWr.Debug
case LvlTrace:
syslogFn = func(m string) error { return nil } // There's no syslog level for trace
}
s := strings.TrimSpace(string(fmtr.Format(r)))
return syslogFn(s)
})
return LazyHandler(&closingHandler{sysWr, h}), nil
}
func (m muster) SyslogHandler(priority syslog.Priority, tag string, fmtr Format) Handler {
return must(SyslogHandler(priority, tag, fmtr))
}
func (m muster) SyslogNetHandler(net, addr string, priority syslog.Priority, tag string, fmtr Format) Handler {
return must(SyslogNetHandler(net, addr, priority, tag, fmtr))
}

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2014 Simon Eskildsen
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

@ -0,0 +1,13 @@
// Based on ssh/terminal:
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build appengine
package term
// IsTty always returns false on AppEngine.
func IsTty(fd uintptr) bool {
return false
}

@ -0,0 +1,13 @@
// Based on ssh/terminal:
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !appengine
package term
import "syscall"
const ioctlReadTermios = syscall.TIOCGETA
type Termios syscall.Termios

@ -0,0 +1,18 @@
package term
import (
"syscall"
)
const ioctlReadTermios = syscall.TIOCGETA
// Go 1.2 doesn't include Termios for FreeBSD. This should be added in 1.3 and this could be merged with terminal_darwin.
type Termios struct {
Iflag uint32
Oflag uint32
Cflag uint32
Lflag uint32
Cc [20]uint8
Ispeed uint32
Ospeed uint32
}

@ -0,0 +1,14 @@
// Based on ssh/terminal:
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build !appengine
package term
import "syscall"
const ioctlReadTermios = syscall.TCGETS
type Termios syscall.Termios

@ -0,0 +1,7 @@
package term
import "syscall"
const ioctlReadTermios = syscall.TIOCGETA
type Termios syscall.Termios

@ -0,0 +1,20 @@
// Based on ssh/terminal:
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build linux,!appengine darwin freebsd openbsd netbsd
package term
import (
"syscall"
"unsafe"
)
// IsTty returns true if the given file descriptor is a terminal.
func IsTty(fd uintptr) bool {
var termios Termios
_, _, err := syscall.Syscall6(syscall.SYS_IOCTL, fd, ioctlReadTermios, uintptr(unsafe.Pointer(&termios)), 0, 0, 0)
return err == 0
}

@ -0,0 +1,7 @@
package term
import "syscall"
const ioctlReadTermios = syscall.TIOCGETA
type Termios syscall.Termios

@ -0,0 +1,9 @@
package term
import "golang.org/x/sys/unix"
// IsTty returns true if the given file descriptor is a terminal.
func IsTty(fd uintptr) bool {
_, err := unix.IoctlGetTermios(int(fd), unix.TCGETA)
return err == nil
}

@ -0,0 +1,26 @@
// Based on ssh/terminal:
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build windows
package term
import (
"syscall"
"unsafe"
)
var kernel32 = syscall.NewLazyDLL("kernel32.dll")
var (
procGetConsoleMode = kernel32.NewProc("GetConsoleMode")
)
// IsTty returns true if the given file descriptor is a terminal.
func IsTty(fd uintptr) bool {
var st uint32
r, _, e := syscall.Syscall(procGetConsoleMode.Addr(), 2, fd, uintptr(unsafe.Pointer(&st)), 0)
return r != 0 && e == 0
}

@ -21,12 +21,17 @@ const (
) )
//ConstructTransactionListMessage constructs serialized transactions //ConstructTransactionListMessage constructs serialized transactions
func ConstructTransactionListMessage(transactions []blockchain.Transaction) []byte { func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(common.NODE)})
byteBuffer.WriteByte(byte(common.TRANSACTION)) byteBuffer.WriteByte(byte(common.TRANSACTION))
byteBuffer.WriteByte(byte(SEND)) byteBuffer.WriteByte(byte(SEND))
encoder := gob.NewEncoder(byteBuffer) encoder := gob.NewEncoder(byteBuffer)
encoder.Encode(transactions) // Copy over the tx data
txs := make([]blockchain.Transaction, len(transactions))
for i := range txs {
txs[i] = *transactions[i]
}
encoder.Encode(txs)
return byteBuffer.Bytes() return byteBuffer.Bytes()
} }

@ -3,194 +3,101 @@ package node
import ( import (
"harmony-benchmark/blockchain" "harmony-benchmark/blockchain"
"harmony-benchmark/consensus" "harmony-benchmark/consensus"
"harmony-benchmark/common" "harmony-benchmark/log"
"harmony-benchmark/p2p"
"log"
"net" "net"
"os" "os"
"time" "strconv"
"bytes" "sync"
"encoding/gob"
) )
// A node represents a program (machine) participating in the network var pendingTxMutex = &sync.Mutex{}
// Node represents a program (machine) participating in the network
// TODO(minhdoan, rj): consider using BlockChannel *chan blockchain.Block for efficiency.
type Node struct { type Node struct {
consensus *consensus.Consensus consensus *consensus.Consensus // Consensus object containing all consensus related data (e.g. committee members, signatures, commits)
BlockChannel chan blockchain.Block BlockChannel chan blockchain.Block // The channel to receive new blocks from Node
pendingTransactions []blockchain.Transaction pendingTransactions []*blockchain.Transaction // All the transactions received but not yet processed for consensus
transactionInConsensus []*blockchain.Transaction // The transactions selected into the new block and under consensus process
blockchain *blockchain.Blockchain // The blockchain for the shard where this node belongs
UtxoPool *blockchain.UTXOPool // The corresponding UTXO pool of the current blockchain
log log.Logger // Log utility
}
// Add new transactions to the pending transaction list
func (node *Node) addPendingTransactions(newTxs []*blockchain.Transaction) {
pendingTxMutex.Lock()
node.pendingTransactions = append(node.pendingTransactions, newTxs...)
pendingTxMutex.Unlock()
}
// Take out a subset of valid transactions from the pending transaction list
// Note the pending transaction list will then contain the rest of the txs
func (node *Node) getTransactionsForNewBlock() []*blockchain.Transaction {
pendingTxMutex.Lock()
selected, unselected := node.UtxoPool.SelectTransactionsForNewBlock(node.pendingTransactions)
node.pendingTransactions = unselected
pendingTxMutex.Unlock()
return selected
} }
// Start a server and process the request by a handler. // Start a server and process the request by a handler.
func (node *Node) StartServer(port string) { func (node *Node) StartServer(port string) {
listenOnPort(port, node.NodeHandler) node.log.Debug("Starting server", "node", node)
node.listenOnPort(port)
} }
func listenOnPort(port string, handler func(net.Conn)) { func (node *Node) listenOnPort(port string) {
listen, err := net.Listen("tcp4", ":"+port) listen, err := net.Listen("tcp4", ":"+port)
defer listen.Close() defer listen.Close()
if err != nil { if err != nil {
log.Fatalf("Socket listen port %s failed,%s", port, err) node.log.Crit("Socket listen port failed", "port", port, "err", err)
os.Exit(1) os.Exit(1)
} }
for { for {
conn, err := listen.Accept() conn, err := listen.Accept()
if err != nil { if err != nil {
log.Printf("Error listening on port: %s. Exiting.", port) node.log.Crit("Error listening on port. Exiting.", "port", port)
log.Fatalln(err)
continue continue
} }
go handler(conn) go node.NodeHandler(conn)
} }
} }
// Handler of the leader node. func (node *Node) String() string {
func (node *Node) NodeHandler(conn net.Conn) { return node.consensus.String()
defer conn.Close()
// Read p2p message payload
content, err := p2p.ReadMessageContent(conn)
consensus := node.consensus
if err != nil {
if consensus.IsLeader {
log.Printf("[Leader] Read p2p data failed:%s", err)
} else {
log.Printf("[Slave] Read p2p data failed:%s", err)
}
return
}
msgCategory, err := common.GetMessageCategory(content)
if err != nil {
if consensus.IsLeader {
log.Printf("[Leader] Read node type failed:%s", err)
} else {
log.Printf("[Slave] Read node type failed:%s", err)
}
return
}
msgType, err := common.GetMessageType(content)
if err != nil {
if consensus.IsLeader {
log.Printf("[Leader] Read action type failed:%s", err)
} else {
log.Printf("[Slave] Read action type failed:%s", err)
}
return
}
msgPayload, err := common.GetMessagePayload(content)
if err != nil {
if consensus.IsLeader {
log.Printf("[Leader] Read message payload failed:%s", err)
} else {
log.Printf("[Slave] Read message payload failed:%s", err)
}
return
}
switch msgCategory {
case common.COMMITTEE:
actionType := common.CommitteeMessageType(msgType)
switch actionType {
case common.CONSENSUS:
if consensus.IsLeader {
consensus.ProcessMessageLeader(msgPayload)
} else {
consensus.ProcessMessageValidator(msgPayload)
}
}
case common.NODE:
actionType := common.NodeMessageType(msgType)
switch actionType {
case common.TRANSACTION:
node.transactionMessageHandler(msgPayload)
case common.CONTROL:
controlType := msgPayload[0]
if ControlMessageType(controlType) == STOP {
log.Println("Stopping Node")
os.Exit(0)
}
}
}
} }
func (node *Node) transactionMessageHandler(msgPayload []byte) { // [Testing code] Should be deleted for production
txMessageType := TransactionMessageType(msgPayload[0]) // Create in genesis block 1000 transactions which assign 1000 token to each address in [1 - 1000]
func (node *Node) AddMoreFakeTransactions() {
switch txMessageType { txs := make([]*blockchain.Transaction, 1000)
case SEND: for i := range txs {
txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type txs[i] = blockchain.NewCoinbaseTX(strconv.Itoa(i), "")
txList := new([]blockchain.Transaction)
err := txDecoder.Decode(&txList)
if err != nil {
log.Println("Failed deserializing transaction list")
}
node.pendingTransactions = append(node.pendingTransactions, *txList...)
case REQUEST:
reader := bytes.NewBuffer(msgPayload[1:])
var txIds map[[32]byte]bool
txId := make([]byte, 32) // 32 byte hash Id
for {
_, err := reader.Read(txId)
if err != nil {
break
}
txIds[getFixedByteTxId(txId)] = true
}
var txToReturn []blockchain.Transaction
for _, tx := range node.pendingTransactions {
if txIds[getFixedByteTxId(tx.ID)] {
txToReturn = append(txToReturn, tx)
}
}
// TODO: return the transaction list to requester
}
}
// Copy the txId byte slice over to 32 byte array so the map can key on it
func getFixedByteTxId(txId []byte) [32]byte {
var id [32]byte
for i := range id {
id[i] = txId[i]
}
return id
}
func (node *Node) WaitForConsensusReady(readySignal chan int) {
for { // keep waiting for consensus ready
<-readySignal
// create a new block
newBlock := new(blockchain.Block)
for {
if len(node.pendingTransactions) >= 10 {
log.Println("Creating new block")
// TODO (Minh): package actual transactions
// For now, just take out 10 transactions
var txList []*blockchain.Transaction
for _, tx := range node.pendingTransactions[0:10] {
txList = append(txList, &tx)
}
node.pendingTransactions = node.pendingTransactions[10:]
newBlock = blockchain.NewBlock(txList, []byte{})
break
}
time.Sleep(1 * time.Second) // Periodically check whether we have enough transactions to package into block.
}
node.BlockChannel <- *newBlock
} }
node.blockchain.Blocks[0].Transactions = append(node.blockchain.Blocks[0].Transactions, txs...)
node.UtxoPool.Update(txs)
} }
// Create a new Node // Create a new Node
func NewNode(consensus *consensus.Consensus) Node { func NewNode(consensus *consensus.Consensus) Node {
node := Node{} node := Node{}
// Consensus and associated channel to communicate blocks
node.consensus = consensus node.consensus = consensus
node.BlockChannel = make(chan blockchain.Block) node.BlockChannel = make(chan blockchain.Block)
// Genesis Block
genesisBlock := &blockchain.Blockchain{}
genesisBlock.Blocks = make([]*blockchain.Block, 0)
coinbaseTx := blockchain.NewCoinbaseTX("harmony", "1")
genesisBlock.Blocks = append(genesisBlock.Blocks, blockchain.NewGenesisBlock(coinbaseTx))
node.blockchain = genesisBlock
// UTXO pool from Genesis block
node.UtxoPool = blockchain.CreateUTXOPoolFromGenesisBlockChain(node.blockchain)
// Logger
node.log = node.consensus.Log
return node return node
} }

@ -0,0 +1,155 @@
package node
import (
"bytes"
"encoding/gob"
"harmony-benchmark/blockchain"
"harmony-benchmark/common"
"harmony-benchmark/p2p"
"net"
"os"
"time"
)
// NodeHandler handles a new incoming connection.
func (node *Node) NodeHandler(conn net.Conn) {
defer conn.Close()
// Read p2p message payload
content, err := p2p.ReadMessageContent(conn)
if err != nil {
node.log.Error("Read p2p data failed", "err", err, "node", node)
return
}
consensus := node.consensus
msgCategory, err := common.GetMessageCategory(content)
if err != nil {
node.log.Error("Read node type failed", "err", err, "node", node)
return
}
msgType, err := common.GetMessageType(content)
if err != nil {
node.log.Error("Read action type failed", "err", err, "node", node)
return
}
msgPayload, err := common.GetMessagePayload(content)
if err != nil {
node.log.Error("Read message payload failed", "err", err, "node", node)
return
}
switch msgCategory {
case common.COMMITTEE:
actionType := common.CommitteeMessageType(msgType)
switch actionType {
case common.CONSENSUS:
if consensus.IsLeader {
consensus.ProcessMessageLeader(msgPayload)
} else {
consensus.ProcessMessageValidator(msgPayload)
}
}
case common.NODE:
actionType := common.NodeMessageType(msgType)
switch actionType {
case common.TRANSACTION:
node.transactionMessageHandler(msgPayload)
case common.CONTROL:
controlType := msgPayload[0]
if ControlMessageType(controlType) == STOP {
node.log.Debug("Stopping Node", "node", node)
os.Exit(0)
}
}
}
}
func (node *Node) transactionMessageHandler(msgPayload []byte) {
txMessageType := TransactionMessageType(msgPayload[0])
switch txMessageType {
case SEND:
txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type
txList := new([]*blockchain.Transaction)
err := txDecoder.Decode(&txList)
if err != nil {
node.log.Error("Failed deserializing transaction list", "node", node)
}
node.addPendingTransactions(*txList)
case REQUEST:
reader := bytes.NewBuffer(msgPayload[1:])
var txIds map[[32]byte]bool
buf := make([]byte, 32) // 32 byte hash Id
for {
_, err := reader.Read(buf)
if err != nil {
break
}
var txId [32]byte
copy(txId[:], buf)
txIds[txId] = true
}
var txToReturn []*blockchain.Transaction
for _, tx := range node.pendingTransactions {
if txIds[tx.ID] {
txToReturn = append(txToReturn, tx)
}
}
// TODO: return the transaction list to requester
}
}
// WaitForConsensusReady ...
func (node *Node) WaitForConsensusReady(readySignal chan int) {
node.log.Debug("Waiting for consensus ready", "node", node)
var newBlock *blockchain.Block
for { // keep waiting for consensus ready
<-readySignal
//node.log.Debug("Adding new block", "currentChainSize", len(node.blockchain.Blocks), "numTxs", len(node.blockchain.GetLatestBlock().Transactions), "PrevHash", node.blockchain.GetLatestBlock().PrevBlockHash, "Hash", node.blockchain.GetLatestBlock().Hash)
for {
// Once we have more than 10 transactions pending we will try creating a new block
if len(node.pendingTransactions) >= 10 {
selectedTxs := node.getTransactionsForNewBlock()
if len(selectedTxs) == 0 {
node.log.Debug("No valid transactions exist", "pendingTx", len(node.pendingTransactions))
} else {
node.log.Debug("Creating new block", "numTxs", len(selectedTxs), "pendingTxs", len(node.pendingTransactions), "currentChainSize", len(node.blockchain.Blocks))
node.transactionInConsensus = selectedTxs
newBlock = blockchain.NewBlock(selectedTxs, node.blockchain.GetLatestBlock().Hash)
break
}
}
// If not enough transactions to run consensus,
// periodically check whether we have enough transactions to package into block.
time.Sleep(1 * time.Second)
}
// Send the new block to consensus so it can be confirmed.
node.BlockChannel <- *newBlock
}
}
func (node *Node) VerifyNewBlock(newBlock *blockchain.Block) bool {
return node.UtxoPool.VerifyTransactions(newBlock.Transactions)
}
func (node *Node) AddNewBlockToBlockchain(newBlock *blockchain.Block) {
// Add it to blockchain
node.blockchain.Blocks = append(node.blockchain.Blocks, newBlock)
// Update UTXO pool
node.UtxoPool.Update(newBlock.Transactions)
// Clear transaction-in-consensus list
node.transactionInConsensus = []*blockchain.Transaction{}
}

@ -0,0 +1,34 @@
package node
import (
"harmony-benchmark/p2p"
"testing"
"harmony-benchmark/consensus"
)
func TestNewNewNode(test *testing.T) {
leader := p2p.Peer{Ip: "1", Port: "2"}
validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := consensus.NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
node := NewNode(&consensus)
if node.consensus == nil {
test.Error("Consensus is not initialized for the node")
}
if node.blockchain == nil {
test.Error("Blockchain is not initialized for the node")
}
if len(node.blockchain.Blocks) != 1 {
test.Error("Genesis block is not initialized for the node")
}
if len(node.blockchain.Blocks[0].Transactions) != 1 {
test.Error("Coinbase TX is not initialized for the node")
}
if node.UtxoPool == nil {
test.Error("Utxo pool is not initialized for the node")
}
}

@ -8,17 +8,14 @@ import (
"strings" "strings"
) )
// Object for a p2p peer (node) // Peer is the object for a p2p peer (node)
type Peer struct { type Peer struct {
// Ip address of the peer Ip string // Ip address of the peer
Ip string Port string // Port number of the peer
// Port number of the peer PubKey string // Public key of the peer
Port string
// Public key of the peer
PubKey string
} }
// Send the message to the peer // SendMessage sends the message to the peer
func SendMessage(peer Peer, msg []byte) { func SendMessage(peer Peer, msg []byte) {
// Construct normal p2p message // Construct normal p2p message
content := ConstructP2pMessage(byte(0), msg) content := ConstructP2pMessage(byte(0), msg)
@ -26,7 +23,7 @@ func SendMessage(peer Peer, msg []byte) {
send(peer.Ip, peer.Port, content) send(peer.Ip, peer.Port, content)
} }
// Send the message to a list of peers // BroadcastMessage sends the message to a list of peers
func BroadcastMessage(peers []Peer, msg []byte) { func BroadcastMessage(peers []Peer, msg []byte) {
// Construct broadcast p2p message // Construct broadcast p2p message
content := ConstructP2pMessage(byte(17), msg) content := ConstructP2pMessage(byte(17), msg)
@ -36,7 +33,7 @@ func BroadcastMessage(peers []Peer, msg []byte) {
} }
} }
// Construct the p2p message as [messageType, contentSize, content] // ConstructP2pMessage constructs the p2p message as [messageType, contentSize, content]
func ConstructP2pMessage(msgType byte, content []byte) []byte { func ConstructP2pMessage(msgType byte, content []byte) []byte {
firstByte := byte(17) // messageType 0x11 firstByte := byte(17) // messageType 0x11

Loading…
Cancel
Save