merge conflict

pull/76/head
Richard Liu 6 years ago
commit 780fedaec4
  1. 9
      README.md
  2. 62
      attack/attack.go
  3. 4
      benchmark.go
  4. 4
      blockchain/blockchain.go
  5. 1
      blockchain/transaction.go
  6. 29
      blockchain/utxopool.go
  7. 4
      blockchain/utxopool_test.go
  8. 293
      client/btctxgen/main.go
  9. 22
      client/config/config.go
  10. 4
      client/txgen/main.go
  11. 4
      client/wallet/main.go
  12. 6
      consensus/consensus.go
  13. 24
      consensus/consensus_leader.go
  14. 8
      consensus/consensus_leader_msg.go
  15. 2
      consensus/consensus_leader_msg_test.go
  16. 10
      consensus/consensus_state.go
  17. 20
      consensus/consensus_validator.go
  18. 4
      consensus/consensus_validator_msg_test.go
  19. 1
      crypto/curve.go
  20. 1
      crypto/sha256.go
  21. 18
      db/db.go
  22. 20
      db/db_test.go
  23. 2
      db/interface.go
  24. 16
      db/memory_db.go
  25. 2
      identitychain/identityblock.go
  26. 4
      identitychain/identitychain_handler.go
  27. 1
      log/term/terminal_darwin.go
  28. 3
      node/node.go
  29. 24
      node/node_handler.go
  30. 2
      p2p/peer.go
  31. 5
      pow/api.go
  32. 28
      proto/common.go
  33. 68
      proto/consensus/consensus.go
  34. 8
      proto/identity/identity.go
  35. 18
      proto/node/node.go
  36. 3
      syncing/syncing.go
  37. 2
      utils/distribution_config.go

@ -1,11 +1,12 @@
# Harmony Benchmark # Harmony Benchmark
[![Build Status](https://travis-ci.com/simple-rules/harmony-benchmark.svg?token=DnoYvYiTAk7pqTo9XsTi&branch=master)](https://travis-ci.com/simple-rules/harmony-benchmark) [![Build Status](https://travis-ci.com/simple-rules/harmony-benchmark.svg?token=DnoYvYiTAk7pqTo9XsTi&branch=master)](https://travis-ci.com/simple-rules/harmony-benchmark)
## Golang Coding Convention ## Coding Guidelines
* In general, we should follow [effective_go](https://golang.org/doc/effective_go.html)
* Code must adhere to the official [Go formatting guidelines](https://golang.org/doc/effective_go.html#formatting) (i.e. uses [gofmt](https://golang.org/cmd/gofmt/)).
* Code must be documented adhering to the official Go [commentary](https://golang.org/doc/effective_go.html#commentary) guidelines.
* Follow [effective_go](https://golang.org/doc/effective_go.html)
* Constant enum should follow CamelCase.
* Comments of each element in a struct is written right after the element.
## Dev Environment Setup ## Dev Environment Setup

@ -9,67 +9,74 @@ import (
"github.com/simple-rules/harmony-benchmark/log" "github.com/simple-rules/harmony-benchmark/log"
) )
// Constants used for attack model.
const ( const (
DroppingTickDuration = 2 * time.Second DroppingTickDuration = 2 * time.Second
HitRate = 10 HitRate = 10
DelayResponseDuration = 10 * time.Second DelayResponseDuration = 10 * time.Second
ConsensusIdThresholdMin = 10 ConsensusIDThresholdMin = 10
ConsensusIdThresholdMax = 100 ConsensusIDThresholdMax = 100
) )
type AttackType byte // Type is the type of attack model.
type Type byte
// Constants of different attack models.
const ( const (
KilledItself AttackType = iota KilledItself Type = iota
DelayResponse DelayResponse
IncorrectResponse IncorrectResponse
) )
// AttackModel contains different models of attacking. // Model contains different models of attacking.
type Attack struct { type Model struct {
AttackEnabled bool AttackEnabled bool
attackType AttackType attackType Type
ConsensusIdThreshold uint32 ConsensusIDThreshold uint32
readyByConsensusThreshold bool readyByConsensusThreshold bool
log log.Logger // Log utility log log.Logger // Log utility
} }
var attack *Attack var attackModel *Model
var once sync.Once var once sync.Once
// GetInstance returns attack model by using singleton pattern. // GetInstance returns attack model by using singleton pattern.
func GetInstance() *Attack { func GetInstance() *Model {
once.Do(func() { once.Do(func() {
attack = &Attack{} attackModel = &Model{}
attack.Init() attackModel.Init()
}) })
return attack return attackModel
} }
func (attack *Attack) Init() { // Init initializes attack model.
func (attack *Model) Init() {
attack.AttackEnabled = false attack.AttackEnabled = false
attack.readyByConsensusThreshold = false attack.readyByConsensusThreshold = false
} }
func (attack *Attack) SetAttackEnabled(AttackEnabled bool) { // SetAttackEnabled sets attack model enabled.
func (attack *Model) SetAttackEnabled(AttackEnabled bool) {
attack.AttackEnabled = AttackEnabled attack.AttackEnabled = AttackEnabled
if AttackEnabled { if AttackEnabled {
attack.attackType = AttackType(rand.Intn(3)) attack.attackType = Type(rand.Intn(3))
attack.ConsensusIdThreshold = uint32(ConsensusIdThresholdMin + rand.Intn(ConsensusIdThresholdMax-ConsensusIdThresholdMin)) attack.ConsensusIDThreshold = uint32(ConsensusIDThresholdMin + rand.Intn(ConsensusIDThresholdMax-ConsensusIDThresholdMin))
} }
} }
func (attack *Attack) SetLogger(log log.Logger) { // SetLogger sets the logger for doing logging.
func (attack *Model) SetLogger(log log.Logger) {
attack.log = log attack.log = log
} }
func (attack *Attack) Run() { // Run runs enabled attacks.
func (attack *Model) Run() {
attack.NodeKilledByItSelf() attack.NodeKilledByItSelf()
attack.DelayResponse() attack.DelayResponse()
} }
// NodeKilledByItSelf runs killing itself attack // NodeKilledByItSelf runs killing itself attack
func (attack *Attack) NodeKilledByItSelf() { func (attack *Model) NodeKilledByItSelf() {
if !attack.AttackEnabled || attack.attackType != KilledItself || !attack.readyByConsensusThreshold { if !attack.AttackEnabled || attack.attackType != KilledItself || !attack.readyByConsensusThreshold {
return return
} }
@ -80,29 +87,32 @@ func (attack *Attack) NodeKilledByItSelf() {
} }
} }
func (attack *Attack) DelayResponse() { // DelayResponse does attack by delaying response.
func (attack *Model) DelayResponse() {
if !attack.AttackEnabled || attack.attackType != DelayResponse || !attack.readyByConsensusThreshold { if !attack.AttackEnabled || attack.attackType != DelayResponse || !attack.readyByConsensusThreshold {
return return
} }
if rand.Intn(HitRate) == 0 { if rand.Intn(HitRate) == 0 {
attack.log.Debug("******************Attack: DelayResponse******************", "PID: ", os.Getpid()) attack.log.Debug("******************Model: DelayResponse******************", "PID: ", os.Getpid())
time.Sleep(DelayResponseDuration) time.Sleep(DelayResponseDuration)
} }
} }
func (attack *Attack) IncorrectResponse() bool { // IncorrectResponse returns if the attack model enable incorrect responding.
func (attack *Model) IncorrectResponse() bool {
if !attack.AttackEnabled || attack.attackType != IncorrectResponse || !attack.readyByConsensusThreshold { if !attack.AttackEnabled || attack.attackType != IncorrectResponse || !attack.readyByConsensusThreshold {
return false return false
} }
if rand.Intn(HitRate) == 0 { if rand.Intn(HitRate) == 0 {
attack.log.Debug("******************Attack: IncorrectResponse******************", "PID: ", os.Getpid()) attack.log.Debug("******************Model: IncorrectResponse******************", "PID: ", os.Getpid())
return true return true
} }
return false return false
} }
func (attack *Attack) UpdateConsensusReady(consensusID uint32) { // UpdateConsensusReady enables an attack type given the current consensusID.
if consensusID > attack.ConsensusIdThreshold { func (attack *Model) UpdateConsensusReady(consensusID uint32) {
if consensusID > attack.ConsensusIDThreshold {
attack.readyByConsensusThreshold = true attack.readyByConsensusThreshold = true
} }
} }

@ -25,6 +25,7 @@ var (
commit string commit string
) )
// Constants used by the benchmark.
const ( const (
AttackProbability = 20 AttackProbability = 20
) )
@ -41,6 +42,7 @@ func attackDetermination(attackedMode int) bool {
return false return false
} }
// InitLDBDatabase initializes a LDBDatabase.
func InitLDBDatabase(ip string, port string) (*db.LDBDatabase, error) { func InitLDBDatabase(ip string, port string) (*db.LDBDatabase, error) {
// TODO(minhdoan): Refactor this. // TODO(minhdoan): Refactor this.
dbFileName := "/tmp/harmony_" + ip + port + ".dat" dbFileName := "/tmp/harmony_" + ip + port + ".dat"
@ -117,7 +119,7 @@ func main() {
loggingInit(*logFolder, role, *ip, *port, *onlyLogTps) loggingInit(*logFolder, role, *ip, *port, *onlyLogTps)
// Initialize leveldb if dbSupported. // Initialize leveldb if dbSupported.
var ldb *db.LDBDatabase = nil var ldb *db.LDBDatabase
if *dbSupported { if *dbSupported {
ldb, _ = InitLDBDatabase(*ip, *port) ldb, _ = InitLDBDatabase(*ip, *port)

@ -21,7 +21,7 @@ func (bc *Blockchain) FindBlock(blockHash []byte) *Block {
return nil return nil
} }
for _, block := range bc.Blocks { for _, block := range bc.Blocks {
if bytes.Compare(block.Hash[:], blockHash[:]) == 0 { if bytes.Equal(block.Hash[:], blockHash[:]) {
return block return block
} }
} }
@ -71,7 +71,7 @@ func (bc *Blockchain) FindUnspentUtxos(address [20]byte) map[TxID]map[uint32]TXO
for outIdx, txOutput := range tx.TxOutput { for outIdx, txOutput := range tx.TxOutput {
shouldContinue := false shouldContinue := false
for index, _ := range spentTXOs[txID] { for index := range spentTXOs[txID] {
if spentTXOs[txID][index] == uint32(outIdx) { if spentTXOs[txID][index] == uint32(outIdx) {
shouldContinue = true shouldContinue = true
break break

@ -47,6 +47,7 @@ type TXOutput struct {
ShardID uint32 // The Id of the shard where this UTXO belongs ShardID uint32 // The Id of the shard where this UTXO belongs
} }
// TxID structure type.
type TxID = [32]byte type TxID = [32]byte
// OutPoint defines a data type that is used to track previous // OutPoint defines a data type that is used to track previous

@ -84,7 +84,7 @@ func (utxoPool *UTXOPool) VerifyTransactions(transactions []*Transaction) bool {
spentTXOs := make(map[[20]byte]map[string]map[uint32]bool) spentTXOs := make(map[[20]byte]map[string]map[uint32]bool)
if utxoPool != nil { if utxoPool != nil {
for _, tx := range transactions { for _, tx := range transactions {
if err, crossShard := utxoPool.VerifyOneTransaction(tx, &spentTXOs); !crossShard && err != nil { if crossShard, err := utxoPool.VerifyOneTransaction(tx, &spentTXOs); !crossShard && err != nil {
return false return false
} }
} }
@ -122,11 +122,11 @@ func (utxoPool *UTXOPool) VerifyStateBlock(stateBlock *Block) bool {
// VerifyOneTransaction verifies if a list of transactions valid. // VerifyOneTransaction verifies if a list of transactions valid.
// Add another sanity check function (e.g. spending the same utxo) called before this one. // Add another sanity check function (e.g. spending the same utxo) called before this one.
func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[[20]byte]map[string]map[uint32]bool) (err error, crossShard bool) { func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[[20]byte]map[string]map[uint32]bool) (crossShard bool, err error) {
var nilPubKey [32]byte var nilPubKey [32]byte
// TODO(ricl): remove. just for btc replay. // TODO(ricl): remove. just for btc replay.
if tx.PublicKey == nilPubKey { if tx.PublicKey == nilPubKey {
return nil, false return false, nil
} }
if len(tx.Proofs) > 1 { if len(tx.Proofs) > 1 {
return utxoPool.VerifyUnlockTransaction(tx) return utxoPool.VerifyUnlockTransaction(tx)
@ -149,7 +149,7 @@ func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[[
// Check if the transaction with the address is spent or not. // Check if the transaction with the address is spent or not.
if val, ok := (*spentTXOs)[in.Address][inTxID][index]; ok { if val, ok := (*spentTXOs)[in.Address][inTxID][index]; ok {
if val { if val {
return errors.New("TxInput is already spent"), crossShard return crossShard, errors.New("TxInput is already spent")
} }
} }
// Mark the transactions with the address and index spent. // Mark the transactions with the address and index spent.
@ -167,7 +167,7 @@ func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[[
inTotal += val inTotal += val
} else { } else {
utxoPool.mutex.Unlock() utxoPool.mutex.Unlock()
return errors.New("Specified TxInput does not exist in utxo pool"), crossShard return crossShard, errors.New("Specified TxInput does not exist in utxo pool")
} }
utxoPool.mutex.Unlock() utxoPool.mutex.Unlock()
} }
@ -184,11 +184,11 @@ func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[[
// TODO: improve this checking logic // TODO: improve this checking logic
if (crossShard && inTotal > outTotal) || (!crossShard && inTotal != outTotal) { if (crossShard && inTotal > outTotal) || (!crossShard && inTotal != outTotal) {
return errors.New("Input and output amount doesn't match"), crossShard return crossShard, errors.New("Input and output amount doesn't match")
} }
if inTotal == 0 { if inTotal == 0 {
return errors.New("Input amount is 0"), false // Here crossShard is false, because if there is no business for this shard, it's effectively not crossShard no matter what. return false, errors.New("Input amount is 0") // Here crossShard is false, because if there is no business for this shard, it's effectively not crossShard no matter what.
} }
// Verify the signature // Verify the signature
@ -200,13 +200,13 @@ func (utxoPool *UTXOPool) VerifyOneTransaction(tx *Transaction, spentTXOs *map[[
tempErr = schnorr.Verify(crypto.Ed25519Curve, pubKey, tx.GetContentToVerify(), tx.Signature[:]) tempErr = schnorr.Verify(crypto.Ed25519Curve, pubKey, tx.GetContentToVerify(), tx.Signature[:])
if tempErr != nil { if tempErr != nil {
log.Error("Failed to verify signature", "error", tempErr, "public key", pubKey, "pubKey in bytes", tx.PublicKey[:]) log.Error("Failed to verify signature", "error", tempErr, "public key", pubKey, "pubKey in bytes", tx.PublicKey[:])
return errors.New("Invalid signature"), crossShard return crossShard, errors.New("Invalid signature")
} }
return nil, crossShard return crossShard, nil
} }
// VerifyUnlockTransaction verifies a cross shard transaction that contains proofs for unlock-to-commit/abort. // VerifyUnlockTransaction verifies a cross shard transaction that contains proofs for unlock-to-commit/abort.
func (utxoPool *UTXOPool) VerifyUnlockTransaction(tx *Transaction) (err error, crossShard bool) { func (utxoPool *UTXOPool) VerifyUnlockTransaction(tx *Transaction) (crossShard bool, err error) {
err = nil err = nil
crossShard = false // unlock transaction is treated as crossShard=false because it will be finalized now (doesn't need more steps) crossShard = false // unlock transaction is treated as crossShard=false because it will be finalized now (doesn't need more steps)
txInputs := make(map[TXInput]bool) txInputs := make(map[TXInput]bool)
@ -370,7 +370,7 @@ func (utxoPool *UTXOPool) UpdateOneTransaction(tx *Transaction) {
// VerifyOneTransactionAndUpdate verifies and update a valid transaction. // VerifyOneTransactionAndUpdate verifies and update a valid transaction.
// Return false if the transaction is not valid. // Return false if the transaction is not valid.
func (utxoPool *UTXOPool) VerifyOneTransactionAndUpdate(tx *Transaction) bool { func (utxoPool *UTXOPool) VerifyOneTransactionAndUpdate(tx *Transaction) bool {
if err, _ := utxoPool.VerifyOneTransaction(tx, nil); err == nil { if _, err := utxoPool.VerifyOneTransaction(tx, nil); err == nil {
utxoPool.UpdateOneTransaction(tx) utxoPool.UpdateOneTransaction(tx)
return true return true
} }
@ -416,7 +416,7 @@ func (utxoPool *UTXOPool) SelectTransactionsForNewBlock(transactions []*Transact
selected, unselected, invalid, crossShardTxs := []*Transaction{}, []*Transaction{}, []*Transaction{}, []*CrossShardTxAndProof{} selected, unselected, invalid, crossShardTxs := []*Transaction{}, []*Transaction{}, []*Transaction{}, []*CrossShardTxAndProof{}
spentTXOs := make(map[[20]byte]map[string]map[uint32]bool) spentTXOs := make(map[[20]byte]map[string]map[uint32]bool)
for _, tx := range transactions { for _, tx := range transactions {
err, crossShard := utxoPool.VerifyOneTransaction(tx, &spentTXOs) crossShard, err := utxoPool.VerifyOneTransaction(tx, &spentTXOs)
if len(selected) < maxNumTxs { if len(selected) < maxNumTxs {
//if err != nil && rand.Intn(10) < 1 { //if err != nil && rand.Intn(10) < 1 {
@ -472,10 +472,7 @@ func (utxoPool *UTXOPool) LockedUtxoExists(address [20]byte, txID string, index
return false return false
} }
_, ok = utxoPool.LockedUtxoMap[address][txID][index] _, ok = utxoPool.LockedUtxoMap[address][txID][index]
if !ok { return ok
return false
}
return true
} }
// DeleteOneLockedUtxo deletes one balance item of UTXOPool and clean up if possible. // DeleteOneLockedUtxo deletes one balance item of UTXOPool and clean up if possible.

@ -16,7 +16,7 @@ func TestVerifyOneTransactionAndUpdate(t *testing.T) {
t.Error("failed to create a new transaction.") t.Error("failed to create a new transaction.")
} }
if err, _ := utxoPool.VerifyOneTransaction(tx, nil); err != nil { if _, err := utxoPool.VerifyOneTransaction(tx, nil); err != nil {
t.Error("failed to verify a valid transaction.") t.Error("failed to verify a valid transaction.")
} }
utxoPool.VerifyOneTransactionAndUpdate(tx) utxoPool.VerifyOneTransactionAndUpdate(tx)
@ -35,7 +35,7 @@ func TestVerifyOneTransactionFail(t *testing.T) {
} }
tx.TxInput = append(tx.TxInput, tx.TxInput[0]) tx.TxInput = append(tx.TxInput, tx.TxInput[0])
if err, _ := utxoPool.VerifyOneTransaction(tx, nil); err == nil { if _, err := utxoPool.VerifyOneTransaction(tx, nil); err == nil {
t.Error("Tx with multiple identical TxInput shouldn't be valid") t.Error("Tx with multiple identical TxInput shouldn't be valid")
} }
} }

@ -0,0 +1,293 @@
/*
The btctxgen iterates the btc tx history block by block, transaction by transaction.
The btxtxiter provide a simple api called `NextTx` for us to move thru TXs one by one.
Same as txgen, iterate on each shard to generate simulated TXs (GenerateSimulatedTransactions):
1. Get a new btc tx
2. If it's a coinbase tx, create a corresponding coinbase tx in our blockchain
3. Otherwise, create a normal TX, which might be cross-shard and might not, depending on whether all the TX inputs belong to the current shard.
Same as txgen, send single shard tx shard by shard, then broadcast cross shard tx.
TODO
Some todos for ricl
* correct the logic to outputing to one of the input shard, rather than the current shard
*/
package main
import (
"flag"
"fmt"
"sync"
"time"
"github.com/simple-rules/harmony-benchmark/blockchain"
"github.com/simple-rules/harmony-benchmark/client"
"github.com/simple-rules/harmony-benchmark/client/btctxiter"
client_config "github.com/simple-rules/harmony-benchmark/client/config"
"github.com/simple-rules/harmony-benchmark/consensus"
"github.com/simple-rules/harmony-benchmark/crypto/pki"
"github.com/simple-rules/harmony-benchmark/log"
"github.com/simple-rules/harmony-benchmark/node"
"github.com/simple-rules/harmony-benchmark/p2p"
proto_node "github.com/simple-rules/harmony-benchmark/proto/node"
)
type txGenSettings struct {
crossShard bool
maxNumTxsPerBatch int
}
type TXRef struct {
txID [32]byte
shardID uint32
toAddress [20]byte // we use the same toAddress in btc and hmy
}
var (
utxoPoolMutex sync.Mutex
setting txGenSettings
iter btctxiter.BTCTXIterator
utxoMapping map[string]TXRef // btcTXID to { txID, shardID }
// map from bitcoin address to a int value (the privKey in hmy)
addressMapping map[[20]byte]int
currentInt int
)
func getHmyInt(btcAddr [20]byte) int {
var privKey int
if privKey, ok := addressMapping[btcAddr]; !ok { // If cannot find key
privKey = currentInt
addressMapping[btcAddr] = privKey
currentInt++
}
return privKey
}
// Generates at most "maxNumTxs" number of simulated transactions based on the current UtxoPools of all shards.
// The transactions are generated by going through the existing utxos and
// randomly select a subset of them as the input for each new transaction. The output
// address of the new transaction are randomly selected from [0 - N), where N is the total number of fake addresses.
//
// When crossShard=true, besides the selected utxo input, select another valid utxo as input from the same address in a second shard.
// Similarly, generate another utxo output in that second shard.
//
// NOTE: the genesis block should contain N coinbase transactions which add
// token (1000) to each address in [0 - N). See node.AddTestingAddresses()
//
// Params:
// shardID - the shardID for current shard
// dataNodes - nodes containing utxopools of all shards
// Returns:
// all single-shard txs
// all cross-shard txs
func generateSimulatedTransactions(shardID int, dataNodes []*node.Node) ([]*blockchain.Transaction, []*blockchain.Transaction) {
/*
UTXO map structure:
{
address: {
txID: {
outputIndex: value
}
}
}
*/
utxoPoolMutex.Lock()
txs := []*blockchain.Transaction{}
crossTxs := []*blockchain.Transaction{}
nodeShardID := dataNodes[shardID].Consensus.ShardID
cnt := 0
LOOP:
for {
btcTx := iter.NextTx()
if btcTx == nil {
log.Error("Failed to parse tx", "height", iter.GetBlockIndex())
}
tx := blockchain.Transaction{}
isCrossShardTx := false
if btctxiter.IsCoinBaseTx(btcTx) {
// ricl: coinbase tx should just have one txo
btcTXO := btcTx.Vout[0]
btcTXOAddr := btcTXO.ScriptPubKey.Addresses[0]
var toAddress [20]byte
copy(toAddress[:], btcTXOAddr) // TODO(ricl): string to [20]byte
hmyInt := getHmyInt(toAddress)
tx = *blockchain.NewCoinbaseTX(pki.GetAddressFromInt(hmyInt), "", nodeShardID)
utxoMapping[btcTx.Hash] = TXRef{tx.ID, nodeShardID, toAddress}
} else {
var btcFromAddresses [][20]byte
for _, btcTXI := range btcTx.Vin {
btcTXIDStr := btcTXI.Txid
txRef := utxoMapping[btcTXIDStr] // find the corresponding harmony tx info
if txRef.shardID != nodeShardID {
isCrossShardTx = true
}
tx.TxInput = append(tx.TxInput, *blockchain.NewTXInput(blockchain.NewOutPoint(&txRef.txID, btcTXI.Vout), [20]byte{}, txRef.shardID))
// Add the from address to array, so that we can later use it to sign the tx.
btcFromAddresses = append(btcFromAddresses, txRef.toAddress)
}
for _, btcTXO := range btcTx.Vout {
for _, btcTXOAddr := range btcTXO.ScriptPubKey.Addresses {
var toAddress [20]byte
copy(toAddress[:], btcTXOAddr) //TODO(ricl): string to [20]byte
txo := blockchain.TXOutput{Amount: int(btcTXO.Value), Address: toAddress, ShardID: nodeShardID}
tx.TxOutput = append(tx.TxOutput, txo)
utxoMapping[btcTx.Txid] = TXRef{tx.ID, nodeShardID, toAddress}
}
}
// get private key and sign the tx
for _, btcFromAddress := range btcFromAddresses {
hmyInt := getHmyInt(btcFromAddress)
tx.SetID() // TODO(RJ): figure out the correct way to set Tx ID.
tx.Sign(pki.GetPrivateKeyScalarFromInt(hmyInt))
}
}
if isCrossShardTx {
crossTxs = append(crossTxs, &tx)
} else {
txs = append(txs, &tx)
}
// log.Debug("[Generator] transformed btc tx", "block height", iter.GetBlockIndex(), "block tx count", iter.GetBlock().TxCount, "block tx cnt", len(iter.GetBlock().Txs), "txi", len(tx.TxInput), "txo", len(tx.TxOutput), "txCount", cnt)
cnt++
if cnt >= setting.maxNumTxsPerBatch {
break LOOP
}
}
utxoPoolMutex.Unlock()
log.Debug("[Generator] generated transations", "single-shard", len(txs), "cross-shard", len(crossTxs))
return txs, crossTxs
}
func initClient(clientNode *node.Node, clientPort string, shardIDLeaderMap *map[uint32]p2p.Peer, nodes *[]*node.Node) {
if clientPort == "" {
return
}
clientNode.Client = client.NewClient(shardIDLeaderMap)
// This func is used to update the client's utxopool when new blocks are received from the leaders
updateBlocksFunc := func(blocks []*blockchain.Block) {
log.Debug("Received new block from leader", "len", len(blocks))
for _, block := range blocks {
for _, node := range *nodes {
if node.Consensus.ShardID == block.ShardID {
log.Debug("Adding block from leader", "shardID", block.ShardID)
// Add it to blockchain
utxoPoolMutex.Lock()
node.AddNewBlock(block)
utxoPoolMutex.Unlock()
} else {
continue
}
}
}
}
clientNode.Client.UpdateBlocks = updateBlocksFunc
// Start the client server to listen to leader's message
go func() {
clientNode.StartServer(clientPort)
}()
}
func main() {
configFile := flag.String("config_file", "local_config.txt", "file containing all ip addresses and config")
maxNumTxsPerBatch := flag.Int("max_num_txs_per_batch", 10000, "number of transactions to send per message")
logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
flag.Parse()
// Read the configs
config := client_config.NewConfig()
config.ReadConfigFile(*configFile)
shardIDLeaderMap := config.GetShardIDToLeaderMap()
// Do cross shard tx if there are more than one shard
setting.crossShard = len(shardIDLeaderMap) > 1
setting.maxNumTxsPerBatch = *maxNumTxsPerBatch
// TODO(Richard): refactor this chuck to a single method
// Setup a logger to stdout and log file.
logFileName := fmt.Sprintf("./%v/txgen.log", *logFolder)
h := log.MultiHandler(
log.StdoutHandler,
log.Must.FileHandler(logFileName, log.LogfmtFormat()), // Log to file
// log.Must.NetHandler("tcp", ":3000", log.JSONFormat()) // Log to remote
)
log.Root().SetHandler(h)
iter.Init()
utxoMapping = make(map[string]TXRef)
addressMapping = make(map[[20]byte]int)
currentInt = 1 // start from address 1
// Nodes containing utxopools to mirror the shards' data in the network
nodes := []*node.Node{}
for shardID, _ := range shardIDLeaderMap {
node := node.New(&consensus.Consensus{ShardID: shardID}, nil)
// Assign many fake addresses so we have enough address to play with at first
node.AddTestingAddresses(10000)
nodes = append(nodes, node)
}
// Client/txgenerator server node setup
clientPort := config.GetClientPort()
consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{})
clientNode := node.New(consensusObj, nil)
initClient(clientNode, clientPort, &shardIDLeaderMap, &nodes)
// Transaction generation process
time.Sleep(3 * time.Second) // wait for nodes to be ready
leaders := []p2p.Peer{}
for _, leader := range shardIDLeaderMap {
leaders = append(leaders, leader)
}
for {
allCrossTxs := []*blockchain.Transaction{}
// Generate simulated transactions
for shardID, leader := range shardIDLeaderMap {
txs, crossTxs := generateSimulatedTransactions(int(shardID), nodes)
allCrossTxs = append(allCrossTxs, crossTxs...)
log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs), "block height", iter.GetBlockIndex())
msg := proto_node.ConstructTransactionListMessage(txs)
p2p.SendMessage(leader, msg)
// Note cross shard txs are later sent in batch
}
if len(allCrossTxs) > 0 {
log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs))
msg := proto_node.ConstructTransactionListMessage(allCrossTxs)
p2p.BroadcastMessage(leaders, msg)
// Put cross shard tx into a pending list waiting for proofs from leaders
if clientPort != "" {
clientNode.Client.PendingCrossTxsMutex.Lock()
for _, tx := range allCrossTxs {
clientNode.Client.PendingCrossTxs[tx.ID] = tx
}
clientNode.Client.PendingCrossTxsMutex.Unlock()
}
}
time.Sleep(500 * time.Millisecond) // Send a batch of transactions periodically
}
// Send a stop message to stop the nodes at the end
msg := proto_node.ConstructStopMessage()
peers := append(config.GetValidators(), leaders...)
p2p.BroadcastMessage(peers, msg)
}

@ -10,23 +10,26 @@ import (
"github.com/simple-rules/harmony-benchmark/p2p" "github.com/simple-rules/harmony-benchmark/p2p"
) )
type ConfigEntry struct { // Entry is a single config of a node.
type Entry struct {
IP string IP string
Port string Port string
Role string Role string
ShardID string ShardID string
} }
// Config is a struct containing multiple Entry of all nodes.
type Config struct { type Config struct {
config []ConfigEntry config []Entry
} }
// NewConfig returns a pointer to a Config.
func NewConfig() *Config { func NewConfig() *Config {
config := Config{} config := Config{}
return &config return &config
} }
// Gets all the validator peers // GetValidators returns all the validator peers
func (config *Config) GetValidators() []p2p.Peer { func (config *Config) GetValidators() []p2p.Peer {
var peerList []p2p.Peer var peerList []p2p.Peer
for _, entry := range config.config { for _, entry := range config.config {
@ -39,7 +42,7 @@ func (config *Config) GetValidators() []p2p.Peer {
return peerList return peerList
} }
// Gets all the leader peers and corresponding shard Ids // GetShardIDToLeaderMap returns all the leader peers and corresponding shard Ids
func (config *Config) GetShardIDToLeaderMap() map[uint32]p2p.Peer { func (config *Config) GetShardIDToLeaderMap() map[uint32]p2p.Peer {
shardIDLeaderMap := map[uint32]p2p.Peer{} shardIDLeaderMap := map[uint32]p2p.Peer{}
for _, entry := range config.config { for _, entry := range config.config {
@ -55,6 +58,7 @@ func (config *Config) GetShardIDToLeaderMap() map[uint32]p2p.Peer {
return shardIDLeaderMap return shardIDLeaderMap
} }
// GetClientPeer returns the client peer.
func (config *Config) GetClientPeer() *p2p.Peer { func (config *Config) GetClientPeer() *p2p.Peer {
for _, entry := range config.config { for _, entry := range config.config {
if entry.Role != "client" { if entry.Role != "client" {
@ -66,7 +70,7 @@ func (config *Config) GetClientPeer() *p2p.Peer {
return nil return nil
} }
// Gets the port of the client node in the config // GetClientPort returns the port of the client node in the config
func (config *Config) GetClientPort() string { func (config *Config) GetClientPort() string {
for _, entry := range config.config { for _, entry := range config.config {
if entry.Role == "client" { if entry.Role == "client" {
@ -76,20 +80,20 @@ func (config *Config) GetClientPort() string {
return "" return ""
} }
// Parse the config file and return a 2d array containing the file data // ReadConfigFile parses the config file and return a 2d array containing the file data
func (config *Config) ReadConfigFile(filename string) error { func (config *Config) ReadConfigFile(filename string) error {
file, err := os.Open(filename) file, err := os.Open(filename)
defer file.Close()
if err != nil { if err != nil {
log.Fatal("Failed to read config file ", filename) log.Fatal("Failed to read config file ", filename)
return err return err
} }
defer file.Close()
fscanner := bufio.NewScanner(file) fscanner := bufio.NewScanner(file)
result := []ConfigEntry{} result := []Entry{}
for fscanner.Scan() { for fscanner.Scan() {
p := strings.Split(fscanner.Text(), " ") p := strings.Split(fscanner.Text(), " ")
entry := ConfigEntry{p[0], p[1], p[2], p[3]} entry := Entry{p[0], p[1], p[2], p[3]}
result = append(result, entry) result = append(result, entry)
} }
config.config = result config.config = result

@ -148,7 +148,7 @@ func generateCrossShardTx(txInfo *TxInfo) {
nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID nodeShardID := txInfo.dataNodes[txInfo.shardID].Consensus.ShardID
crossShardID := nodeShardID crossShardID := nodeShardID
// a random shard to spend money to // a random shard to spend money to
for true { for {
crossShardID = uint32(rand.Intn(len(txInfo.dataNodes))) crossShardID = uint32(rand.Intn(len(txInfo.dataNodes)))
if crossShardID != nodeShardID { if crossShardID != nodeShardID {
break break
@ -335,7 +335,7 @@ func main() {
client.InitLookUpIntPriKeyMap() client.InitLookUpIntPriKeyMap()
subsetCounter := 0 subsetCounter := 0
for true { for {
t := time.Now() t := time.Now()
if totalTime > 0 && t.Sub(start).Seconds() >= totalTime { if totalTime > 0 && t.Sub(start).Seconds() >= totalTime {
log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime)

@ -275,7 +275,7 @@ func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error
doneSignal := make(chan int) doneSignal := make(chan int)
go func() { go func() {
for true { for {
if len(walletNode.Client.PendingCrossTxs) == 0 { if len(walletNode.Client.PendingCrossTxs) == 0 {
doneSignal <- 0 doneSignal <- 0
break break
@ -300,7 +300,7 @@ func FetchUtxos(addresses [][20]byte, walletNode *node.Node) (map[uint32]blockch
doneSignal := make(chan int) doneSignal := make(chan int)
go func() { go func() {
for true { for {
if len(walletNode.Client.ShardUtxoMap) == len(*walletNode.Client.Leaders) { if len(walletNode.Client.ShardUtxoMap) == len(*walletNode.Client.Leaders) {
doneSignal <- 0 doneSignal <- 0
break break

@ -18,7 +18,7 @@ import (
// Consensus data containing all info related to one round of consensus process // Consensus data containing all info related to one round of consensus process
type Consensus struct { type Consensus struct {
state ConsensusState state State
// Commits collected from validators. A map from node Id to its commitment // Commits collected from validators. A map from node Id to its commitment
commitments *map[uint16]kyber.Point commitments *map[uint16]kyber.Point
finalCommitments *map[uint16]kyber.Point finalCommitments *map[uint16]kyber.Point
@ -87,8 +87,8 @@ type Consensus struct {
// should be stored in this temporary structure. In case the round N-1 finishes, it can catch // should be stored in this temporary structure. In case the round N-1 finishes, it can catch
// up to the latest state of round N by using this structure. // up to the latest state of round N by using this structure.
type BlockConsensusStatus struct { type BlockConsensusStatus struct {
blockHeader []byte // the block header of the block which the consensus is running on blockHeader []byte // the block header of the block which the consensus is running on
state ConsensusState // the latest state of the consensus state State // the latest state of the consensus
} }
// NewConsensus creates a new Consensus object // NewConsensus creates a new Consensus object

@ -54,9 +54,9 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) {
switch msgType { switch msgType {
case proto_consensus.StartConsensus: case proto_consensus.StartConsensus:
consensus.processStartConsensusMessage(payload) consensus.processStartConsensusMessage(payload)
case proto_consensus.COMMIT: case proto_consensus.Commit:
consensus.processCommitMessage(payload, ChallengeDone) consensus.processCommitMessage(payload, ChallengeDone)
case proto_consensus.RESPONSE: case proto_consensus.Response:
consensus.processResponseMessage(payload, CollectiveSigDone) consensus.processResponseMessage(payload, CollectiveSigDone)
case proto_consensus.FinalCommit: case proto_consensus.FinalCommit:
consensus.processCommitMessage(payload, FinalChallengeDone) consensus.processCommitMessage(payload, FinalChallengeDone)
@ -109,7 +109,7 @@ func (consensus *Consensus) commitByLeader(firstRound bool) {
} }
// processCommitMessage processes the commit message sent from validators // processCommitMessage processes the commit message sent from validators
func (consensus *Consensus) processCommitMessage(payload []byte, targetState ConsensusState) { func (consensus *Consensus) processCommitMessage(payload []byte, targetState State) {
// Read payload data // Read payload data
offset := 0 offset := 0
// 4 byte consensus id // 4 byte consensus id
@ -147,12 +147,12 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Con
consensus.mutex.Lock() consensus.mutex.Lock()
defer consensus.mutex.Unlock() defer consensus.mutex.Unlock()
if consensusID != consensus.consensusID { if consensusID != consensus.consensusID {
consensus.Log.Warn("Received COMMIT with wrong consensus Id", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus) consensus.Log.Warn("Received Commit with wrong consensus Id", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus)
return return
} }
if bytes.Compare(blockHash, consensus.blockHash[:]) != 0 { if bytes.Compare(blockHash, consensus.blockHash[:]) != 0 {
consensus.Log.Warn("Received COMMIT with wrong blockHash", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus) consensus.Log.Warn("Received Commit with wrong blockHash", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus)
return return
} }
@ -186,9 +186,9 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Con
consensus.Log.Debug("Enough commitments received with signatures", "num", len(*commitments), "state", consensus.state) consensus.Log.Debug("Enough commitments received with signatures", "num", len(*commitments), "state", consensus.state)
// Broadcast challenge // Broadcast challenge
msgTypeToSend := proto_consensus.CHALLENGE // targetState == ChallengeDone msgTypeToSend := proto_consensus.Challenge // targetState == ChallengeDone
if targetState == FinalChallengeDone { if targetState == FinalChallengeDone {
msgTypeToSend = proto_consensus.FINAL_CHALLENGE msgTypeToSend = proto_consensus.FinalChallenge
} }
msgToSend, challengeScalar, aggCommitment := consensus.constructChallengeMessage(msgTypeToSend) msgToSend, challengeScalar, aggCommitment := consensus.constructChallengeMessage(msgTypeToSend)
bytes, err := challengeScalar.MarshalBinary() bytes, err := challengeScalar.MarshalBinary()
@ -196,10 +196,10 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Con
log.Error("Failed to serialize challenge") log.Error("Failed to serialize challenge")
} }
if msgTypeToSend == proto_consensus.CHALLENGE { if msgTypeToSend == proto_consensus.Challenge {
copy(consensus.challenge[:], bytes) copy(consensus.challenge[:], bytes)
consensus.aggregatedCommitment = aggCommitment consensus.aggregatedCommitment = aggCommitment
} else if msgTypeToSend == proto_consensus.FINAL_CHALLENGE { } else if msgTypeToSend == proto_consensus.FinalChallenge {
copy(consensus.finalChallenge[:], bytes) copy(consensus.finalChallenge[:], bytes)
consensus.aggregatedFinalCommitment = aggCommitment consensus.aggregatedFinalCommitment = aggCommitment
} }
@ -233,7 +233,7 @@ func (consensus *Consensus) responseByLeader(challenge kyber.Scalar, firstRound
} }
// Processes the response message sent from validators // Processes the response message sent from validators
func (consensus *Consensus) processResponseMessage(payload []byte, targetState ConsensusState) { func (consensus *Consensus) processResponseMessage(payload []byte, targetState State) {
//#### Read payload data //#### Read payload data
offset := 0 offset := 0
// 4 byte consensus id // 4 byte consensus id
@ -264,11 +264,11 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState C
// check consensus Id // check consensus Id
if consensusID != consensus.consensusID { if consensusID != consensus.consensusID {
shouldProcess = false shouldProcess = false
consensus.Log.Warn("Received RESPONSE with wrong consensus Id", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus) consensus.Log.Warn("Received Response with wrong consensus Id", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus)
} }
if bytes.Compare(blockHash, consensus.blockHash[:]) != 0 { if bytes.Compare(blockHash, consensus.blockHash[:]) != 0 {
consensus.Log.Warn("Received RESPONSE with wrong blockHash", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus) consensus.Log.Warn("Received Response with wrong blockHash", "myConsensusId", consensus.consensusID, "theirConsensusId", consensusID, "consensus", consensus)
return return
} }

@ -35,7 +35,7 @@ func (consensus *Consensus) constructAnnounceMessage() []byte {
signature := consensus.signMessage(buffer.Bytes()) signature := consensus.signMessage(buffer.Bytes())
buffer.Write(signature) buffer.Write(signature)
return proto_consensus.ConstructConsensusMessage(proto_consensus.ANNOUNCE, buffer.Bytes()) return proto_consensus.ConstructConsensusMessage(proto_consensus.Announce, buffer.Bytes())
} }
// Construct the challenge message, returning challenge message in bytes, challenge scalar and aggregated commmitment point. // Construct the challenge message, returning challenge message in bytes, challenge scalar and aggregated commmitment point.
@ -55,9 +55,9 @@ func (consensus *Consensus) constructChallengeMessage(msgTypeToSend proto_consen
binary.BigEndian.PutUint16(twoBytes, consensus.nodeID) binary.BigEndian.PutUint16(twoBytes, consensus.nodeID)
buffer.Write(twoBytes) buffer.Write(twoBytes)
commitmentsMap := consensus.commitments // msgType == CHALLENGE commitmentsMap := consensus.commitments // msgType == Challenge
bitmap := consensus.bitmap bitmap := consensus.bitmap
if msgTypeToSend == proto_consensus.FINAL_CHALLENGE { if msgTypeToSend == proto_consensus.FinalChallenge {
commitmentsMap = consensus.finalCommitments commitmentsMap = consensus.finalCommitments
bitmap = consensus.finalBitmap bitmap = consensus.finalBitmap
} }
@ -115,7 +115,7 @@ func (consensus *Consensus) constructCollectiveSigMessage(collectiveSig [64]byte
signature := consensus.signMessage(buffer.Bytes()) signature := consensus.signMessage(buffer.Bytes())
buffer.Write(signature) buffer.Write(signature)
return proto_consensus.ConstructConsensusMessage(proto_consensus.COLLECTIVE_SIG, buffer.Bytes()) return proto_consensus.ConstructConsensusMessage(proto_consensus.CollectiveSig, buffer.Bytes())
} }
func getAggregatedCommit(commitments []kyber.Point) (commitment kyber.Point, bytes []byte) { func getAggregatedCommit(commitments []kyber.Point) (commitment kyber.Point, bytes []byte) {

@ -42,7 +42,7 @@ func TestConstructChallengeMessage(test *testing.T) {
consensus.bitmap.SetKey(leaderPubKey, true) consensus.bitmap.SetKey(leaderPubKey, true)
consensus.bitmap.SetKey(validatorPubKey, true) consensus.bitmap.SetKey(validatorPubKey, true)
msg, _, _ := consensus.constructChallengeMessage(consensus_proto.CHALLENGE) msg, _, _ := consensus.constructChallengeMessage(consensus_proto.Challenge)
if len(msg) != 1+1+1+4+32+2+33+33+32+64 { if len(msg) != 1+1+1+4+32+2+33+33+32+64 {
test.Errorf("Annouce message is not constructed in the correct size: %d", len(msg)) test.Errorf("Annouce message is not constructed in the correct size: %d", len(msg))

@ -1,15 +1,15 @@
package consensus package consensus
// ConsensusState is the consensus state enum for both leader and validator // State is the consensus state enum for both leader and validator
// States for leader: // States for leader:
// Finished, AnnounceDone, ChallengeDone // Finished, AnnounceDone, ChallengeDone
// States for validator: // States for validator:
// Finished, CommitDone, ResponseDone // Finished, CommitDone, ResponseDone
type ConsensusState int type State int
// Followings are the set of states of validators or leaders during consensus. // Followings are the set of states of validators or leaders during consensus.
const ( const (
Finished ConsensusState = iota Finished State = iota
AnnounceDone AnnounceDone
CommitDone CommitDone
ChallengeDone ChallengeDone
@ -20,8 +20,8 @@ const (
FinalResponseDone FinalResponseDone
) )
// Returns string name for the ConsensusState enum // Returns string name for the State enum
func (state ConsensusState) String() string { func (state State) String() string {
names := [...]string{ names := [...]string{
"Finished", "Finished",
"AnnounceDone", "AnnounceDone",

@ -29,13 +29,13 @@ func (consensus *Consensus) ProcessMessageValidator(message []byte) {
consensus.Log.Info("Received consensus Message", "type", msgType) consensus.Log.Info("Received consensus Message", "type", msgType)
switch msgType { switch msgType {
case proto_consensus.ANNOUNCE: case proto_consensus.Announce:
consensus.processAnnounceMessage(payload) consensus.processAnnounceMessage(payload)
case proto_consensus.CHALLENGE: case proto_consensus.Challenge:
consensus.processChallengeMessage(payload, ResponseDone) consensus.processChallengeMessage(payload, ResponseDone)
case proto_consensus.FINAL_CHALLENGE: case proto_consensus.FinalChallenge:
consensus.processChallengeMessage(payload, FinalResponseDone) consensus.processChallengeMessage(payload, FinalResponseDone)
case proto_consensus.COLLECTIVE_SIG: case proto_consensus.CollectiveSig:
consensus.processCollectiveSigMessage(payload) consensus.processCollectiveSigMessage(payload)
default: default:
consensus.Log.Error("Unexpected message type", "msgType", msgType, "consensus", consensus) consensus.Log.Error("Unexpected message type", "msgType", msgType, "consensus", consensus)
@ -111,7 +111,7 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) {
} }
// check block hash // check block hash
if bytes.Compare(blockHash[:], blockHeaderObj.CalculateBlockHash()[:]) != 0 || bytes.Compare(blockHeaderObj.Hash[:], blockHeaderObj.CalculateBlockHash()[:]) != 0 { if !bytes.Equal(blockHash[:], blockHeaderObj.CalculateBlockHash()[:]) || !bytes.Equal(blockHeaderObj.Hash[:], blockHeaderObj.CalculateBlockHash()[:]) {
consensus.Log.Warn("Block hash doesn't match", "consensus", consensus) consensus.Log.Warn("Block hash doesn't match", "consensus", consensus)
return return
} }
@ -122,7 +122,7 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) {
return return
} }
secret, msgToSend := consensus.constructCommitMessage(proto_consensus.COMMIT) secret, msgToSend := consensus.constructCommitMessage(proto_consensus.Commit)
// Store the commitment secret // Store the commitment secret
consensus.secret[consensusID] = secret consensus.secret[consensusID] = secret
@ -134,7 +134,7 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) {
} }
// Processes the challenge message sent from the leader // Processes the challenge message sent from the leader
func (consensus *Consensus) processChallengeMessage(payload []byte, targetState ConsensusState) { func (consensus *Consensus) processChallengeMessage(payload []byte, targetState State) {
//#### Read payload data //#### Read payload data
offset := 0 offset := 0
// 4 byte consensus id // 4 byte consensus id
@ -193,7 +193,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte, targetState
defer consensus.mutex.Unlock() defer consensus.mutex.Unlock()
// check block hash // check block hash
if bytes.Compare(blockHash[:], consensus.blockHash[:]) != 0 { if !bytes.Equal(blockHash[:], consensus.blockHash[:]) {
consensus.Log.Warn("Block hash doesn't match", "consensus", consensus) consensus.Log.Warn("Block hash doesn't match", "consensus", consensus)
return return
} }
@ -238,7 +238,7 @@ func (consensus *Consensus) processChallengeMessage(payload []byte, targetState
log.Warn("Failed to generate response", "err", err) log.Warn("Failed to generate response", "err", err)
return return
} }
msgTypeToSend := proto_consensus.RESPONSE msgTypeToSend := proto_consensus.Response
if targetState == FinalResponseDone { if targetState == FinalResponseDone {
msgTypeToSend = proto_consensus.FinalResponse msgTypeToSend = proto_consensus.FinalResponse
} }
@ -353,7 +353,7 @@ func (consensus *Consensus) processCollectiveSigMessage(payload []byte) {
} }
// check block hash // check block hash
if bytes.Compare(blockHash[:], consensus.blockHash[:]) != 0 { if !bytes.Equal(blockHash[:], consensus.blockHash[:]) {
consensus.Log.Warn("Block hash doesn't match", "consensus", consensus) consensus.Log.Warn("Block hash doesn't match", "consensus", consensus)
return return
} }

@ -13,7 +13,7 @@ func TestConstructCommitMessage(test *testing.T) {
validator := p2p.Peer{Ip: "3", Port: "5"} validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = [32]byte{} consensus.blockHash = [32]byte{}
_, msg := consensus.constructCommitMessage(consensus_proto.COMMIT) _, msg := consensus.constructCommitMessage(consensus_proto.Commit)
if len(msg) != 1+1+1+4+32+2+32+64 { if len(msg) != 1+1+1+4+32+2+32+64 {
test.Errorf("Commit message is not constructed in the correct size: %d", len(msg)) test.Errorf("Commit message is not constructed in the correct size: %d", len(msg))
@ -25,7 +25,7 @@ func TestConstructResponseMessage(test *testing.T) {
validator := p2p.Peer{Ip: "3", Port: "5"} validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader) consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = [32]byte{} consensus.blockHash = [32]byte{}
msg := consensus.constructResponseMessage(consensus_proto.RESPONSE, crypto.Ed25519Curve.Scalar()) msg := consensus.constructResponseMessage(consensus_proto.Response, crypto.Ed25519Curve.Scalar())
if len(msg) != 1+1+1+4+32+2+32+64 { if len(msg) != 1+1+1+4+32+2+32+64 {
test.Errorf("Response message is not constructed in the correct size: %d", len(msg)) test.Errorf("Response message is not constructed in the correct size: %d", len(msg))

@ -2,4 +2,5 @@ package crypto
import "github.com/dedis/kyber/group/edwards25519" import "github.com/dedis/kyber/group/edwards25519"
// Ed25519Curve value gets initialized.
var Ed25519Curve = edwards25519.NewBlakeSHA256Ed25519() var Ed25519Curve = edwards25519.NewBlakeSHA256Ed25519()

@ -2,6 +2,7 @@ package crypto
import "crypto/sha256" import "crypto/sha256"
// HashSha256 returns result of Sha256.
func HashSha256(message string) [32]byte { func HashSha256(message string) [32]byte {
return sha256.Sum256([]byte(message)) return sha256.Sum256([]byte(message))
} }

@ -17,8 +17,7 @@ const (
writePauseWarningThrottler = 1 * time.Minute writePauseWarningThrottler = 1 * time.Minute
) )
var OpenFileLimit = 64 // LDBDatabase is database based on leveldb.
type LDBDatabase struct { type LDBDatabase struct {
fn string // filename for reporting fn string // filename for reporting
db *leveldb.DB // LevelDB instance db *leveldb.DB // LevelDB instance
@ -73,6 +72,7 @@ func (db *LDBDatabase) Put(key []byte, value []byte) error {
return db.db.Put(key, value, nil) return db.db.Put(key, value, nil)
} }
// Has is used to check if the given key is included into the database.
func (db *LDBDatabase) Has(key []byte) (bool, error) { func (db *LDBDatabase) Has(key []byte) (bool, error) {
return db.db.Has(key, nil) return db.db.Has(key, nil)
} }
@ -91,6 +91,7 @@ func (db *LDBDatabase) Delete(key []byte) error {
return db.db.Delete(key, nil) return db.db.Delete(key, nil)
} }
// NewIterator returns the current iterator of the db.
func (db *LDBDatabase) NewIterator() iterator.Iterator { func (db *LDBDatabase) NewIterator() iterator.Iterator {
return db.db.NewIterator(nil, nil) return db.db.NewIterator(nil, nil)
} }
@ -100,6 +101,7 @@ func (db *LDBDatabase) NewIteratorWithPrefix(prefix []byte) iterator.Iterator {
return db.db.NewIterator(util.BytesPrefix(prefix), nil) return db.db.NewIterator(util.BytesPrefix(prefix), nil)
} }
// Close closes the database.
func (db *LDBDatabase) Close() { func (db *LDBDatabase) Close() {
// Stop the metrics collection to avoid internal database races // Stop the metrics collection to avoid internal database races
db.quitLock.Lock() db.quitLock.Lock()
@ -121,12 +123,15 @@ func (db *LDBDatabase) Close() {
} }
} }
// LDB returns the pointer to leveldb on which the LDBDatabase is built.
func (db *LDBDatabase) LDB() *leveldb.DB { func (db *LDBDatabase) LDB() *leveldb.DB {
return db.db return db.db
} }
// TODO(minhdoan): Might add meter func from ethereum-go repo /* TODO(minhdoan): Might add meter func from ethereum-go repo
*/
// NewBatch returns Batch interface for a series of leveldb transactions.
func (db *LDBDatabase) NewBatch() Batch { func (db *LDBDatabase) NewBatch() Batch {
return &ldbBatch{db: db.db, b: new(leveldb.Batch)} return &ldbBatch{db: db.db, b: new(leveldb.Batch)}
} }
@ -137,26 +142,31 @@ type ldbBatch struct {
size int size int
} }
// Put is used to put key, value into the batch of transactions.
func (b *ldbBatch) Put(key, value []byte) error { func (b *ldbBatch) Put(key, value []byte) error {
b.b.Put(key, value) b.b.Put(key, value)
b.size += len(value) b.size += len(value)
return nil return nil
} }
// Delete is used to delete the item associated with the given key as a part of the batch.
func (b *ldbBatch) Delete(key []byte) error { func (b *ldbBatch) Delete(key []byte) error {
b.b.Delete(key) b.b.Delete(key)
b.size += 1 b.size++
return nil return nil
} }
// Write writes the patch of transactions.
func (b *ldbBatch) Write() error { func (b *ldbBatch) Write() error {
return b.db.Write(b.b, nil) return b.db.Write(b.b, nil)
} }
// ValueSize returns the size of the patch.
func (b *ldbBatch) ValueSize() int { func (b *ldbBatch) ValueSize() int {
return b.size return b.size
} }
// Reset resets the batch.
func (b *ldbBatch) Reset() { func (b *ldbBatch) Reset() {
b.b.Reset() b.b.Reset()
b.size = 0 b.size = 0

@ -26,7 +26,7 @@ func newTestLDB() (*LDBDatabase, func()) {
} }
} }
var test_values = []string{"", "a", "1251", "\x00123\x00"} var testValues = []string{"", "a", "1251", "\x00123\x00"}
func TestLDB_PutGet(t *testing.T) { func TestLDB_PutGet(t *testing.T) {
db, remove := newTestLDB() db, remove := newTestLDB()
@ -41,14 +41,14 @@ func TestMemoryDB_PutGet(t *testing.T) {
func testPutGet(db Database, t *testing.T) { func testPutGet(db Database, t *testing.T) {
t.Parallel() t.Parallel()
for _, k := range test_values { for _, k := range testValues {
err := db.Put([]byte(k), nil) err := db.Put([]byte(k), nil)
if err != nil { if err != nil {
t.Fatalf("put failed: %v", err) t.Fatalf("put failed: %v", err)
} }
} }
for _, k := range test_values { for _, k := range testValues {
data, err := db.Get([]byte(k)) data, err := db.Get([]byte(k))
if err != nil { if err != nil {
t.Fatalf("get failed: %v", err) t.Fatalf("get failed: %v", err)
@ -63,14 +63,14 @@ func testPutGet(db Database, t *testing.T) {
t.Fatalf("expect to return a not found error") t.Fatalf("expect to return a not found error")
} }
for _, v := range test_values { for _, v := range testValues {
err := db.Put([]byte(v), []byte(v)) err := db.Put([]byte(v), []byte(v))
if err != nil { if err != nil {
t.Fatalf("put failed: %v", err) t.Fatalf("put failed: %v", err)
} }
} }
for _, v := range test_values { for _, v := range testValues {
data, err := db.Get([]byte(v)) data, err := db.Get([]byte(v))
if err != nil { if err != nil {
t.Fatalf("get failed: %v", err) t.Fatalf("get failed: %v", err)
@ -80,14 +80,14 @@ func testPutGet(db Database, t *testing.T) {
} }
} }
for _, v := range test_values { for _, v := range testValues {
err := db.Put([]byte(v), []byte("?")) err := db.Put([]byte(v), []byte("?"))
if err != nil { if err != nil {
t.Fatalf("put override failed: %v", err) t.Fatalf("put override failed: %v", err)
} }
} }
for _, v := range test_values { for _, v := range testValues {
data, err := db.Get([]byte(v)) data, err := db.Get([]byte(v))
if err != nil { if err != nil {
t.Fatalf("get failed: %v", err) t.Fatalf("get failed: %v", err)
@ -97,7 +97,7 @@ func testPutGet(db Database, t *testing.T) {
} }
} }
for _, v := range test_values { for _, v := range testValues {
orig, err := db.Get([]byte(v)) orig, err := db.Get([]byte(v))
if err != nil { if err != nil {
t.Fatalf("get failed: %v", err) t.Fatalf("get failed: %v", err)
@ -112,14 +112,14 @@ func testPutGet(db Database, t *testing.T) {
} }
} }
for _, v := range test_values { for _, v := range testValues {
err := db.Delete([]byte(v)) err := db.Delete([]byte(v))
if err != nil { if err != nil {
t.Fatalf("delete %q failed: %v", v, err) t.Fatalf("delete %q failed: %v", v, err)
} }
} }
for _, v := range test_values { for _, v := range testValues {
_, err := db.Get([]byte(v)) _, err := db.Get([]byte(v))
if err == nil { if err == nil {
t.Fatalf("got deleted value %q", v) t.Fatalf("got deleted value %q", v)

@ -1,6 +1,6 @@
package db package db
// Code using batches should try to add this much data to the batch. // IdealBatchSize is the max size of batch transactions.
// The value was determined empirically. // The value was determined empirically.
const IdealBatchSize = 100 * 1024 const IdealBatchSize = 100 * 1024

@ -7,26 +7,27 @@ import (
"github.com/simple-rules/harmony-benchmark/utils" "github.com/simple-rules/harmony-benchmark/utils"
) )
/* // MemDatabase is the test memory database. It won't be used for any production.
* This is a test memory database. Do not use for any production it does not get persisted
*/
type MemDatabase struct { type MemDatabase struct {
db map[string][]byte db map[string][]byte
lock sync.RWMutex lock sync.RWMutex
} }
// NewMemDatabase returns a pointer of the new creation of MemDatabase.
func NewMemDatabase() *MemDatabase { func NewMemDatabase() *MemDatabase {
return &MemDatabase{ return &MemDatabase{
db: make(map[string][]byte), db: make(map[string][]byte),
} }
} }
// NewMemDatabaseWithCap returns a pointer of the new creation of MemDatabase with the given size.
func NewMemDatabaseWithCap(size int) *MemDatabase { func NewMemDatabaseWithCap(size int) *MemDatabase {
return &MemDatabase{ return &MemDatabase{
db: make(map[string][]byte, size), db: make(map[string][]byte, size),
} }
} }
// Put puts (key, value) item into MemDatabase.
func (db *MemDatabase) Put(key []byte, value []byte) error { func (db *MemDatabase) Put(key []byte, value []byte) error {
db.lock.Lock() db.lock.Lock()
defer db.lock.Unlock() defer db.lock.Unlock()
@ -35,6 +36,7 @@ func (db *MemDatabase) Put(key []byte, value []byte) error {
return nil return nil
} }
// Has checks if the key is included into MemDatabase.
func (db *MemDatabase) Has(key []byte) (bool, error) { func (db *MemDatabase) Has(key []byte) (bool, error) {
db.lock.RLock() db.lock.RLock()
defer db.lock.RUnlock() defer db.lock.RUnlock()
@ -43,6 +45,7 @@ func (db *MemDatabase) Has(key []byte) (bool, error) {
return ok, nil return ok, nil
} }
// Get gets value of the given key.
func (db *MemDatabase) Get(key []byte) ([]byte, error) { func (db *MemDatabase) Get(key []byte) ([]byte, error) {
db.lock.RLock() db.lock.RLock()
defer db.lock.RUnlock() defer db.lock.RUnlock()
@ -53,6 +56,7 @@ func (db *MemDatabase) Get(key []byte) ([]byte, error) {
return nil, errors.New("not found") return nil, errors.New("not found")
} }
// Keys returns all keys of the given MemDatabase.
func (db *MemDatabase) Keys() [][]byte { func (db *MemDatabase) Keys() [][]byte {
db.lock.RLock() db.lock.RLock()
defer db.lock.RUnlock() defer db.lock.RUnlock()
@ -64,6 +68,7 @@ func (db *MemDatabase) Keys() [][]byte {
return keys return keys
} }
// Delete deletes the given key.
func (db *MemDatabase) Delete(key []byte) error { func (db *MemDatabase) Delete(key []byte) error {
db.lock.Lock() db.lock.Lock()
defer db.lock.Unlock() defer db.lock.Unlock()
@ -72,12 +77,15 @@ func (db *MemDatabase) Delete(key []byte) error {
return nil return nil
} }
// Close closes the given db.
func (db *MemDatabase) Close() {} func (db *MemDatabase) Close() {}
// NewBatch returns a batch of MemDatabase transactions.
func (db *MemDatabase) NewBatch() Batch { func (db *MemDatabase) NewBatch() Batch {
return &memBatch{db: db} return &memBatch{db: db}
} }
// Len returns the length of the given db.
func (db *MemDatabase) Len() int { return len(db.db) } func (db *MemDatabase) Len() int { return len(db.db) }
type kv struct { type kv struct {
@ -99,7 +107,7 @@ func (b *memBatch) Put(key, value []byte) error {
func (b *memBatch) Delete(key []byte) error { func (b *memBatch) Delete(key []byte) error {
b.writes = append(b.writes, kv{utils.CopyBytes(key), nil, true}) b.writes = append(b.writes, kv{utils.CopyBytes(key), nil, true})
b.size += 1 b.size++
return nil return nil
} }

@ -30,7 +30,7 @@ func (b *IdentityBlock) Serialize() []byte {
return result.Bytes() return result.Bytes()
} }
//Get Identities // GetIdentities returns a list of identities.
func (b *IdentityBlock) GetIdentities() []*node.Node { func (b *IdentityBlock) GetIdentities() []*node.Node {
return b.Identities return b.Identities
} }

@ -57,7 +57,7 @@ func (IDC *IdentityChain) IdentityChainHandler(conn net.Conn) {
switch idMsgType { switch idMsgType {
case proto_identity.REGISTER: case proto_identity.REGISTER:
IDC.registerIdentity(msgPayload) IDC.registerIdentity(msgPayload)
case proto_identity.ANNOUNCE: case proto_identity.Announce:
IDC.acceptNewConnection(msgPayload) IDC.acceptNewConnection(msgPayload)
} }
@ -107,8 +107,6 @@ func (IDC *IdentityChain) acceptNewConnection(msgPayload []byte) {
challengeNonce := int((rnd.Int31())) challengeNonce := int((rnd.Int31()))
req := pow.NewRequest(5, []byte(strconv.Itoa(challengeNonce))) req := pow.NewRequest(5, []byte(strconv.Itoa(challengeNonce)))
IDC.PowMap[Node.Self] = req IDC.PowMap[Node.Self] = req
fmt.Println(Node.Self)
fmt.Println(req)
buffer.Write([]byte(req)) buffer.Write([]byte(req))
// 32 byte block hash // 32 byte block hash
// buffer.Write(prevBlockHash) // buffer.Write(prevBlockHash)

@ -10,4 +10,5 @@ import "syscall"
const ioctlReadTermios = syscall.TIOCGETA const ioctlReadTermios = syscall.TIOCGETA
// Termios is syscall.Termios.
type Termios syscall.Termios type Termios syscall.Termios

@ -126,8 +126,7 @@ func (node *Node) countNumTransactionsInBlockchain() int {
//ConnectIdentityChain connects to identity chain //ConnectIdentityChain connects to identity chain
func (node *Node) ConnectIdentityChain() { func (node *Node) ConnectIdentityChain() {
IDCPeer := node.IDCPeer IDCPeer := node.IDCPeer
p2p.SendMessage(IDCPeer, identity.ConstructIdentityMessage(identity.ANNOUNCE, node.SerializeWaitNode())) p2p.SendMessage(IDCPeer, identity.ConstructIdentityMessage(identity.Announce, node.SerializeWaitNode()))
return
} }
//NewWaitNode is a way to initiate a waiting no //NewWaitNode is a way to initiate a waiting no

@ -78,14 +78,14 @@ func (node *Node) NodeHandler(conn net.Conn) {
case proto_identity.REGISTER: case proto_identity.REGISTER:
fmt.Println("received a identity message") fmt.Println("received a identity message")
node.processPOWMessage(msgPayload) node.processPOWMessage(msgPayload)
case proto_identity.ANNOUNCE: case proto_identity.Announce:
node.log.Error("Announce message should be sent to IdentityChain") node.log.Error("Announce message should be sent to IdentityChain")
} }
} }
case proto.CONSENSUS: case proto.Consensus:
actionType := consensus.ConsensusMessageType(msgType) actionType := consensus.ConsensusMessageType(msgType)
switch actionType { switch actionType {
case consensus.CONSENSUS: case consensus.Consensus:
if consensusObj.IsLeader { if consensusObj.IsLeader {
consensusObj.ProcessMessageLeader(msgPayload) consensusObj.ProcessMessageLeader(msgPayload)
} else { } else {
@ -100,8 +100,8 @@ func (node *Node) NodeHandler(conn net.Conn) {
case proto_node.BLOCK: case proto_node.BLOCK:
blockMsgType := proto_node.BlockMessageType(msgPayload[0]) blockMsgType := proto_node.BlockMessageType(msgPayload[0])
switch blockMsgType { switch blockMsgType {
case proto_node.SYNC: case proto_node.Sync:
decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SYNC messge type decoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the Sync messge type
blocks := new([]*blockchain.Block) blocks := new([]*blockchain.Block)
decoder.Decode(blocks) decoder.Decode(blocks)
if node.Client != nil && node.Client.UpdateBlocks != nil && blocks != nil { if node.Client != nil && node.Client.UpdateBlocks != nil && blocks != nil {
@ -199,7 +199,7 @@ FOR_LOOP:
} }
w.Write(proto_node.SerializeBlockchainSyncMessage(&blockchainSyncMessage)) w.Write(proto_node.SerializeBlockchainSyncMessage(&blockchainSyncMessage))
w.Flush() w.Flush()
case proto_node.DONE: case proto_node.Done:
break FOR_LOOP break FOR_LOOP
} }
content, err := p2p.ReadMessageContent(conn) content, err := p2p.ReadMessageContent(conn)
@ -235,15 +235,15 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) {
txMessageType := proto_node.TransactionMessageType(msgPayload[0]) txMessageType := proto_node.TransactionMessageType(msgPayload[0])
switch txMessageType { switch txMessageType {
case proto_node.SEND: case proto_node.Send:
txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the SEND messge type txDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the Send messge type
txList := new([]*blockchain.Transaction) txList := new([]*blockchain.Transaction)
err := txDecoder.Decode(txList) err := txDecoder.Decode(txList)
if err != nil { if err != nil {
node.log.Error("Failed to deserialize transaction list", "error", err) node.log.Error("Failed to deserialize transaction list", "error", err)
} }
node.addPendingTransactions(*txList) node.addPendingTransactions(*txList)
case proto_node.REQUEST: case proto_node.Request:
reader := bytes.NewBuffer(msgPayload[1:]) reader := bytes.NewBuffer(msgPayload[1:])
var txIDs map[[32]byte]bool var txIDs map[[32]byte]bool
buf := make([]byte, 32) // 32 byte hash Id buf := make([]byte, 32) // 32 byte hash Id
@ -265,15 +265,15 @@ func (node *Node) transactionMessageHandler(msgPayload []byte) {
} }
} }
// TODO: return the transaction list to requester // TODO: return the transaction list to requester
case proto_node.UNLOCK: case proto_node.Unlock:
txAndProofDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the UNLOCK messge type txAndProofDecoder := gob.NewDecoder(bytes.NewReader(msgPayload[1:])) // skip the Unlock messge type
txAndProofs := new([]*blockchain.Transaction) txAndProofs := new([]*blockchain.Transaction)
err := txAndProofDecoder.Decode(&txAndProofs) err := txAndProofDecoder.Decode(&txAndProofs)
if err != nil { if err != nil {
node.log.Error("Failed deserializing transaction and proofs list", "node", node) node.log.Error("Failed deserializing transaction and proofs list", "node", node)
} }
node.log.Debug("RECEIVED UNLOCK MESSAGE", "num", len(*txAndProofs)) node.log.Debug("RECEIVED Unlock MESSAGE", "num", len(*txAndProofs))
node.addPendingTransactions(*txAndProofs) node.addPendingTransactions(*txAndProofs)
} }

@ -48,7 +48,7 @@ func BroadcastMessage(peers []Peer, msg []byte) {
peerCopy := peer peerCopy := peer
go send(peerCopy.Ip, peerCopy.Port, content) go send(peerCopy.Ip, peerCopy.Port, content)
} }
log.Info("Broadcasting Done", "time spent(s)", time.Now().Sub(start).Seconds()) log.Info("Broadcasting Done", "time spent(s)", time.Since(start).Seconds())
} }
func SelectMyPeers(peers []Peer, min int, max int) []Peer { func SelectMyPeers(peers []Peer, min int, max int) []Peer {

@ -77,10 +77,7 @@ func (req *Request) UnmarshalText(buf []byte) error {
} }
req.Difficulty = uint32(diff) req.Difficulty = uint32(diff)
req.Nonce, err = base64.RawStdEncoding.DecodeString(bits[2]) req.Nonce, err = base64.RawStdEncoding.DecodeString(bits[2])
if err != nil { return err
return err
}
return nil
} }
// Convenience function to check whether a proof of work is fulfilled // Convenience function to check whether a proof of work is fulfilled

@ -9,10 +9,10 @@ The message structure of any message in Harmony network
---- content start ----- ---- content start -----
1 byte - message category 1 byte - message category
0x00: CONSENSUS 0x00: Consensus
0x01: NODE... 0x01: NODE...
1 byte - message type 1 byte - message type
- for CONSENSUS category - for Consensus category
0x00: consensus 0x00: consensus
0x01: sharding ... 0x01: sharding ...
- for NODE category - for NODE category
@ -24,41 +24,41 @@ n - 2 bytes - actual message payload
// The message category enum // The message category enum
type MessageCategory byte type MessageCategory byte
//CONSENSUS and other message categories //Consensus and other message categories
const ( const (
CONSENSUS MessageCategory = iota Consensus MessageCategory = iota
NODE NODE
CLIENT CLIENT
IDENTITY IDENTITY
// TODO: add more types // TODO: add more types
) )
// MESSAGE_CATEGORY_BYTES is the number of bytes message category takes // MessageCategoryBytes is the number of bytes message category takes
const MESSAGE_CATEGORY_BYTES = 1 const MessageCategoryBytes = 1
// MESSAGE_TYPE_BYTES is the number of bytes message type takes // MessageTypeBytes is the number of bytes message type takes
const MESSAGE_TYPE_BYTES = 1 const MessageTypeBytes = 1
// Get the message category from the p2p message content // Get the message category from the p2p message content
func GetMessageCategory(message []byte) (MessageCategory, error) { func GetMessageCategory(message []byte) (MessageCategory, error) {
if len(message) < MESSAGE_CATEGORY_BYTES { if len(message) < MessageCategoryBytes {
return 0, errors.New("Failed to get message category: no data available.") return 0, errors.New("Failed to get message category: no data available.")
} }
return MessageCategory(message[MESSAGE_CATEGORY_BYTES-1]), nil return MessageCategory(message[MessageCategoryBytes-1]), nil
} }
// Get the message type from the p2p message content // Get the message type from the p2p message content
func GetMessageType(message []byte) (byte, error) { func GetMessageType(message []byte) (byte, error) {
if len(message) < MESSAGE_CATEGORY_BYTES+MESSAGE_TYPE_BYTES { if len(message) < MessageCategoryBytes+MessageTypeBytes {
return 0, errors.New("Failed to get message type: no data available.") return 0, errors.New("Failed to get message type: no data available.")
} }
return byte(message[MESSAGE_CATEGORY_BYTES+MESSAGE_TYPE_BYTES-1]), nil return byte(message[MessageCategoryBytes+MessageTypeBytes-1]), nil
} }
// Get the node message payload from the p2p message content // Get the node message payload from the p2p message content
func GetMessagePayload(message []byte) ([]byte, error) { func GetMessagePayload(message []byte) ([]byte, error) {
if len(message) < MESSAGE_CATEGORY_BYTES+MESSAGE_TYPE_BYTES { if len(message) < MessageCategoryBytes+MessageTypeBytes {
return []byte{}, errors.New("Failed to get message payload: no data available.") return []byte{}, errors.New("Failed to get message payload: no data available.")
} }
return message[MESSAGE_CATEGORY_BYTES+MESSAGE_TYPE_BYTES:], nil return message[MessageCategoryBytes+MessageTypeBytes:], nil
} }

@ -12,11 +12,11 @@ Consensus message is the payload of p2p message.
Consensus message data structure: Consensus message data structure:
ANNOUNCE: Announce:
---- message start ----- ---- message start -----
1 byte - consensus.MessageType 1 byte - consensus.MessageType
0x00 - ANNOUNCE 0x00 - Announce
0x01 - COMMIT 0x01 - Commit
... ...
4 byte - consensus id 4 byte - consensus id
32 byte - block hash 32 byte - block hash
@ -26,11 +26,11 @@ ANNOUNCE:
64 byte - signature 64 byte - signature
---- message end ----- ---- message end -----
COMMIT: Commit:
---- message start ----- ---- message start -----
1 byte - consensus.MessageType 1 byte - consensus.MessageType
0x00 - ANNOUNCE 0x00 - Announce
0x01 - COMMIT 0x01 - Commit
... ...
4 byte - consensus id 4 byte - consensus id
32 byte - block hash 32 byte - block hash
@ -39,11 +39,11 @@ COMMIT:
64 byte - signature 64 byte - signature
---- message end ----- ---- message end -----
CHALLENGE: Challenge:
---- message start ----- ---- message start -----
1 byte - consensus.MessageType 1 byte - consensus.MessageType
0x00 - ANNOUNCE 0x00 - Announce
0x01 - COMMIT 0x01 - Commit
... ...
4 byte - consensus id 4 byte - consensus id
32 byte - block hash 32 byte - block hash
@ -54,11 +54,11 @@ CHALLENGE:
64 byte - signature 64 byte - signature
---- message end ----- ---- message end -----
RESPONSE: Response:
---- message start ----- ---- message start -----
1 byte - consensus.MessageType 1 byte - consensus.MessageType
0x00 - ANNOUNCE 0x00 - Announce
0x01 - COMMIT 0x01 - Commit
... ...
4 byte - consensus id 4 byte - consensus id
32 byte - block hash 32 byte - block hash
@ -68,29 +68,31 @@ RESPONSE:
---- message end ----- ---- message end -----
*/ */
// the number of bytes consensus message type occupies // MessageTypeBytes is the number of bytes consensus message type occupies
const ConsensusMessageTypeBytes = 1 const MessageTypeBytes = 1
// The specific types of message under CONSENSUS category // ConsensusMessageType is the specific types of message under Consensus category
type ConsensusMessageType byte type ConsensusMessageType byte
// Consensus message type constants.
const ( const (
CONSENSUS ConsensusMessageType = iota Consensus ConsensusMessageType = iota
// TODO: add more types // TODO: add more types
) )
// Consensus communication message type. // MessageType is the consensus communication message type.
// Leader and validator dispatch messages based on incoming message type // Leader and validator dispatch messages based on incoming message type
type MessageType int type MessageType int
// Message type constants.
const ( const (
ANNOUNCE MessageType = iota Announce MessageType = iota
COMMIT Commit
CHALLENGE Challenge
RESPONSE Response
COLLECTIVE_SIG CollectiveSig
FinalCommit FinalCommit
FINAL_CHALLENGE FinalChallenge
FinalResponse FinalResponse
StartConsensus StartConsensus
) )
@ -98,18 +100,18 @@ const (
// Returns string name for the MessageType enum // Returns string name for the MessageType enum
func (msgType MessageType) String() string { func (msgType MessageType) String() string {
names := [...]string{ names := [...]string{
"ANNOUNCE", "Announce",
"COMMIT", "Commit",
"CHALLENGE", "Challenge",
"RESPONSE", "Response",
"COLLECTIVE_SIG", "CollectiveSig",
"FinalCommit", "FinalCommit",
"FINAL_CHALLENGE", "FinalChallenge",
"FinalResponse", "FinalResponse",
"StartConsensus", "StartConsensus",
} }
if msgType < ANNOUNCE || msgType > StartConsensus { if msgType < Announce || msgType > StartConsensus {
return "Unknown" return "Unknown"
} }
return names[msgType] return names[msgType]
@ -128,13 +130,13 @@ func GetConsensusMessagePayload(message []byte) ([]byte, error) {
if len(message) < 2 { if len(message) < 2 {
return []byte{}, errors.New("Failed to get consensus message payload: no data available.") return []byte{}, errors.New("Failed to get consensus message payload: no data available.")
} }
return message[ConsensusMessageTypeBytes:], nil return message[MessageTypeBytes:], nil
} }
// Concatenate msgType as one byte with payload, and return the whole byte array // Concatenate msgType as one byte with payload, and return the whole byte array
func ConstructConsensusMessage(consensusMsgType MessageType, payload []byte) []byte { func ConstructConsensusMessage(consensusMsgType MessageType, payload []byte) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.CONSENSUS)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.Consensus)})
byteBuffer.WriteByte(byte(CONSENSUS)) byteBuffer.WriteByte(byte(Consensus))
byteBuffer.WriteByte(byte(consensusMsgType)) byteBuffer.WriteByte(byte(consensusMsgType))
byteBuffer.Write(payload) byteBuffer.Write(payload)
return byteBuffer.Bytes() return byteBuffer.Bytes()

@ -8,7 +8,7 @@ import (
) )
// the number of bytes consensus message type occupies // the number of bytes consensus message type occupies
const IDENTITY_MESSAGE_TYPE_BYTES = 1 const IdentityMessageTypeBytes = 1
type IdentityMessageType byte type IdentityMessageType byte
@ -21,7 +21,7 @@ type MessageType int
const ( const (
REGISTER MessageType = iota REGISTER MessageType = iota
ANNOUNCE Announce
CONFIG CONFIG
) )
@ -29,7 +29,7 @@ const (
func (msgType MessageType) String() string { func (msgType MessageType) String() string {
names := [...]string{ names := [...]string{
"REGISTER", "REGISTER",
"ANNOUNCE", "Announce",
"CONFIG", "CONFIG",
} }
@ -52,7 +52,7 @@ func GetIdentityMessagePayload(message []byte) ([]byte, error) {
if len(message) < 2 { if len(message) < 2 {
return []byte{}, errors.New("Failed to get identity message payload: no data available.") return []byte{}, errors.New("Failed to get identity message payload: no data available.")
} }
return message[IDENTITY_MESSAGE_TYPE_BYTES:], nil return message[IdentityMessageTypeBytes:], nil
} }
// Concatenate msgType as one byte with payload, and return the whole byte array // Concatenate msgType as one byte with payload, and return the whole byte array

@ -32,7 +32,7 @@ type BlockchainSyncMessage struct {
type BlockchainSyncMessageType int type BlockchainSyncMessageType int
const ( const (
DONE BlockchainSyncMessageType = iota Done BlockchainSyncMessageType = iota
GetLastBlockHashes GetLastBlockHashes
GetBlock GetBlock
) )
@ -41,16 +41,16 @@ const (
type TransactionMessageType int type TransactionMessageType int
const ( const (
SEND TransactionMessageType = iota Send TransactionMessageType = iota
REQUEST Request
UNLOCK Unlock
) )
// BlockMessageType represents the types of messages used for NODE/BLOCK // BlockMessageType represents the types of messages used for NODE/BLOCK
type BlockMessageType int type BlockMessageType int
const ( const (
SYNC BlockMessageType = iota Sync BlockMessageType = iota
) )
// The types of messages used for NODE/BLOCK // The types of messages used for NODE/BLOCK
@ -101,7 +101,7 @@ func DeserializeBlockchainSyncMessage(d []byte) (*BlockchainSyncMessage, error)
func ConstructUnlockToCommitOrAbortMessage(txsAndProofs []*blockchain.Transaction) []byte { func ConstructUnlockToCommitOrAbortMessage(txsAndProofs []*blockchain.Transaction) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)})
byteBuffer.WriteByte(byte(Transaction)) byteBuffer.WriteByte(byte(Transaction))
byteBuffer.WriteByte(byte(UNLOCK)) byteBuffer.WriteByte(byte(Unlock))
encoder := gob.NewEncoder(byteBuffer) encoder := gob.NewEncoder(byteBuffer)
encoder.Encode(txsAndProofs) encoder.Encode(txsAndProofs)
return byteBuffer.Bytes() return byteBuffer.Bytes()
@ -124,7 +124,7 @@ func ConstructFetchUtxoMessage(sender p2p.Peer, addresses [][20]byte) []byte {
func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []byte { func ConstructTransactionListMessage(transactions []*blockchain.Transaction) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)})
byteBuffer.WriteByte(byte(Transaction)) byteBuffer.WriteByte(byte(Transaction))
byteBuffer.WriteByte(byte(SEND)) byteBuffer.WriteByte(byte(Send))
encoder := gob.NewEncoder(byteBuffer) encoder := gob.NewEncoder(byteBuffer)
// Copy over the tx data // Copy over the tx data
txs := make([]blockchain.Transaction, len(transactions)) txs := make([]blockchain.Transaction, len(transactions))
@ -161,7 +161,7 @@ func GenerateBlockchainSyncMessage(payload []byte) *BlockchainSyncMessage {
func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte { func ConstructRequestTransactionsMessage(transactionIds [][]byte) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)})
byteBuffer.WriteByte(byte(Transaction)) byteBuffer.WriteByte(byte(Transaction))
byteBuffer.WriteByte(byte(REQUEST)) byteBuffer.WriteByte(byte(Request))
for _, txID := range transactionIds { for _, txID := range transactionIds {
byteBuffer.Write(txID) byteBuffer.Write(txID)
} }
@ -180,7 +180,7 @@ func ConstructStopMessage() []byte {
func ConstructBlocksSyncMessage(blocks []blockchain.Block) []byte { func ConstructBlocksSyncMessage(blocks []blockchain.Block) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)})
byteBuffer.WriteByte(byte(BLOCK)) byteBuffer.WriteByte(byte(BLOCK))
byteBuffer.WriteByte(byte(SYNC)) byteBuffer.WriteByte(byte(Sync))
encoder := gob.NewEncoder(byteBuffer) encoder := gob.NewEncoder(byteBuffer)
encoder.Encode(blocks) encoder.Encode(blocks)

@ -15,12 +15,9 @@ import (
type SyncPeerConfig struct { type SyncPeerConfig struct {
peer p2p.Peer peer p2p.Peer
conn net.Conn conn net.Conn
block *blockchain.Block
w *bufio.Writer w *bufio.Writer
receivedMsg []byte
err error err error
trusted bool trusted bool
indexes []uint16
blockHashes [][32]byte blockHashes [][32]byte
} }

@ -74,11 +74,11 @@ func (config *DistributionConfig) GetClientPort() string {
// Parse the config file and return a 2d array containing the file data // Parse the config file and return a 2d array containing the file data
func (config *DistributionConfig) ReadConfigFile(filename string) error { func (config *DistributionConfig) ReadConfigFile(filename string) error {
file, err := os.Open(filename) file, err := os.Open(filename)
defer file.Close()
if err != nil { if err != nil {
log.Fatal("Failed to read config file ", filename) log.Fatal("Failed to read config file ", filename)
return err return err
} }
defer file.Close()
fscanner := bufio.NewScanner(file) fscanner := bufio.NewScanner(file)
result := []ConfigEntry{} result := []ConfigEntry{}

Loading…
Cancel
Save