Merge branch 'master' of github.com:simple-rules/harmony-benchmark

pull/61/head
Alok Kothari 6 years ago
commit 6a338f542c
  1. 7
      README.md
  2. 21
      benchmark.go
  3. 65
      blockchain/merkle_tree.go
  4. 60
      blockchain/merkle_tree_test.go
  5. 4
      client/btctxgen/main.go
  6. 4
      client/txgen/main.go
  7. 24
      consensus/consensus.go
  8. 165
      consensus/consensus_leader.go
  9. 39
      consensus/consensus_leader_msg.go
  10. 6
      consensus/consensus_leader_msg_test.go
  11. 80
      consensus/consensus_validator.go
  12. 6
      deploy.sh
  13. 8
      node/node.go
  14. 6
      node/node_handler.go
  15. 4
      node/node_test.go
  16. 0
      waitnode/wait_node.go
  17. 0
      waitnode/wait_node_test.go

@ -24,10 +24,17 @@ cd harmony-benchmark
go get ./...
```
## Usage
### Running local test without db
```
./deploy.sh local_config.txt
```
### Running local test with db
```
./deploy.sh local_config.txt 1
```
## Testing
Make sure you the following command and make sure everything passed before submitting your code.

@ -10,6 +10,7 @@ import (
"github.com/simple-rules/harmony-benchmark/attack"
"github.com/simple-rules/harmony-benchmark/consensus"
"github.com/simple-rules/harmony-benchmark/db"
"github.com/simple-rules/harmony-benchmark/log"
"github.com/simple-rules/harmony-benchmark/node"
"github.com/simple-rules/harmony-benchmark/utils"
@ -38,12 +39,23 @@ func startProfiler(shardID string, logFolder string) {
}
}
func InitLDBDatabase(ip string, port string) (*db.LDBDatabase, error) {
// TODO(minhdoan): Refactor this.
dbFileName := "/tmp/harmony_" + ip + port + ".dat"
var err = os.RemoveAll(dbFileName)
if err != nil {
fmt.Println(err.Error())
}
return db.NewLDBDatabase(dbFileName, 0, 0)
}
func main() {
ip := flag.String("ip", "127.0.0.1", "IP of the node")
port := flag.String("port", "9000", "port of the node.")
configFile := flag.String("config_file", "config.txt", "file containing all ip addresses")
logFolder := flag.String("log_folder", "latest", "the folder collecting the logs of this execution")
attackedMode := flag.Int("attacked_mode", 0, "0 means not attacked, 1 means attacked, 2 means being open to be selected as attacked")
dbSupported := flag.Int("db_supported", 0, "0 means not db_supported, 1 means db_supported")
flag.Parse()
// Set up randomization seed.
@ -74,6 +86,13 @@ func main() {
)
log.Root().SetHandler(h)
// Initialize leveldb if dbSupported.
var ldb *db.LDBDatabase = nil
if *dbSupported == 1 {
ldb, _ = InitLDBDatabase(*ip, *port)
}
// Consensus object.
consensus := consensus.NewConsensus(*ip, *port, shardID, peers, leader)
@ -85,7 +104,7 @@ func main() {
// Set logger to attack model.
attack.GetInstance().SetLogger(consensus.Log)
// Current node.
currentNode := node.New(consensus)
currentNode := node.New(consensus, ldb)
// Create client peer.
clientPeer := distributionConfig.GetClientPeer()
// If there is a client configured in the node list.

@ -0,0 +1,65 @@
package blockchain
import (
"crypto/sha256"
)
// MerkleTree represent a Merkle tree
type MerkleTree struct {
RootNode *MerkleNode
}
// MerkleNode represent a Merkle tree node
type MerkleNode struct {
Left *MerkleNode
Right *MerkleNode
Data []byte
}
// NewMerkleTree creates a new Merkle tree from a sequence of data
func NewMerkleTree(data [][]byte) *MerkleTree {
var nodes []MerkleNode
if len(data)%2 != 0 {
data = append(data, data[len(data)-1])
}
for _, datum := range data {
node := NewMerkleNode(nil, nil, datum)
nodes = append(nodes, *node)
}
for i := 0; i < len(data)/2; i++ {
var newLevel []MerkleNode
for j := 0; j < len(nodes); j += 2 {
node := NewMerkleNode(&nodes[j], &nodes[j+1], nil)
newLevel = append(newLevel, *node)
}
nodes = newLevel
}
mTree := MerkleTree{&nodes[0]}
return &mTree
}
// NewMerkleNode creates a new Merkle tree node
func NewMerkleNode(left, right *MerkleNode, data []byte) *MerkleNode {
mNode := MerkleNode{}
if left == nil && right == nil {
hash := sha256.Sum256(data)
mNode.Data = hash[:]
} else {
prevHashes := append(left.Data, right.Data...)
hash := sha256.Sum256(prevHashes)
mNode.Data = hash[:]
}
mNode.Left = left
mNode.Right = right
return &mNode
}

@ -0,0 +1,60 @@
package blockchain
import (
"encoding/hex"
"fmt"
"testing"
)
func TestNewMerkleNode(t *testing.T) {
data := [][]byte{
[]byte("node1"),
[]byte("node2"),
[]byte("node3"),
}
// Level 1
n1 := NewMerkleNode(nil, nil, data[0])
n2 := NewMerkleNode(nil, nil, data[1])
n3 := NewMerkleNode(nil, nil, data[2])
n4 := NewMerkleNode(nil, nil, data[2])
// Level 2
n5 := NewMerkleNode(n1, n2, nil)
n6 := NewMerkleNode(n3, n4, nil)
// Level 3
n7 := NewMerkleNode(n5, n6, nil)
if hex.EncodeToString(n7.Data) != "4e3e44e55926330ab6c31892f980f8bfd1a6e910ff1ebc3f778211377f35227e" {
t.Errorf("merkle tree is not built correctly.")
}
}
func TestNewMerkleTree(t *testing.T) {
data := [][]byte{
[]byte("node1"),
[]byte("node2"),
[]byte("node3"),
}
// Level 1
n1 := NewMerkleNode(nil, nil, data[0])
n2 := NewMerkleNode(nil, nil, data[1])
n3 := NewMerkleNode(nil, nil, data[2])
n4 := NewMerkleNode(nil, nil, data[2])
// Level 2
n5 := NewMerkleNode(n1, n2, nil)
n6 := NewMerkleNode(n3, n4, nil)
// Level 3
n7 := NewMerkleNode(n5, n6, nil)
rootHash := fmt.Sprintf("%x", n7.Data)
mTree := NewMerkleTree(data)
if rootHash != fmt.Sprintf("%x", mTree.RootNode.Data) {
t.Errorf("Merkle tree root hash is incorrect")
}
}

@ -200,13 +200,13 @@ func main() {
// Nodes containing utxopools to mirror the shards' data in the network
nodes := []*node.Node{}
for _, shardID := range shardIDs {
nodes = append(nodes, node.New(&consensus.Consensus{ShardID: shardID}))
nodes = append(nodes, node.New(&consensus.Consensus{ShardID: shardID}, nil))
}
// Client/txgenerator server node setup
clientPort := configr.GetClientPort()
consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{})
clientNode := node.New(consensusObj)
clientNode := node.New(consensusObj, nil)
initClient(clientNode, clientPort, &leaders, &nodes)

@ -276,7 +276,7 @@ func main() {
// Nodes containing utxopools to mirror the shards' data in the network
nodes := []*node.Node{}
for _, shardId := range shardIds {
node := node.New(&consensus.Consensus{ShardID: shardId})
node := node.New(&consensus.Consensus{ShardID: shardId}, nil)
// Assign many fake addresses so we have enough address to play with at first
node.AddTestingAddresses(setting.numOfAddress)
nodes = append(nodes, node)
@ -285,7 +285,7 @@ func main() {
// Client/txgenerator server node setup
clientPort := configr.GetClientPort()
consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{})
clientNode := node.New(consensusObj)
clientNode := node.New(consensusObj, nil)
if clientPort != "" {
clientNode.Client = client.NewClient(&leaders)

@ -20,14 +20,17 @@ import (
type Consensus struct {
state ConsensusState
// Commits collected from validators. A map from node Id to its commitment
commitments map[uint16]kyber.Point
commitments *map[uint16]kyber.Point
finalCommitments *map[uint16]kyber.Point
aggregatedCommitment kyber.Point
aggregatedFinalCommitment kyber.Point
bitmap *crypto.Mask
finalBitmap *crypto.Mask
// Challenges
challenge [32]byte
finalChallenge [32]byte
// Commits collected from validators.
bitmap *crypto.Mask
// Responses collected from validators
responses map[uint16]kyber.Scalar
// map of nodeId to validator Peer object
@ -97,7 +100,8 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) *
consensus.IsLeader = false
}
consensus.commitments = make(map[uint16]kyber.Point)
consensus.commitments = &map[uint16]kyber.Point{}
consensus.finalCommitments = &map[uint16]kyber.Point{}
consensus.validators = make(map[uint16]p2p.Peer)
consensus.responses = make(map[uint16]kyber.Scalar)
@ -114,10 +118,15 @@ func NewConsensus(ip, port, ShardID string, peers []p2p.Peer, leader p2p.Peer) *
allPublicKeys = append(allPublicKeys, leader.PubKey)
mask, err := crypto.NewMask(crypto.Ed25519Curve, allPublicKeys, consensus.leader.PubKey)
if err != nil {
panic("Failed to create commitment mask")
panic("Failed to create mask")
}
finalMask, err := crypto.NewMask(crypto.Ed25519Curve, allPublicKeys, consensus.leader.PubKey)
if err != nil {
panic("Failed to create final mask")
}
consensus.publicKeys = allPublicKeys
consensus.bitmap = mask
consensus.finalBitmap = finalMask
// For now use socket address as 16 byte Id
// TODO: populate with correct Id
@ -170,7 +179,10 @@ func (consensus *Consensus) getValidatorPeers() []p2p.Peer {
// Reset the state of the consensus
func (consensus *Consensus) ResetState() {
consensus.state = FINISHED
consensus.commitments = make(map[uint16]kyber.Point)
consensus.commitments = &map[uint16]kyber.Point{}
consensus.bitmap.SetMask([]byte{})
consensus.finalCommitments = &map[uint16]kyber.Point{}
consensus.finalBitmap.SetMask([]byte{})
consensus.responses = make(map[uint16]kyber.Scalar)
consensus.secret = nil
}

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"encoding/gob"
"errors"
"time"
"github.com/dedis/kyber"
@ -48,12 +49,14 @@ func (consensus *Consensus) ProcessMessageLeader(message []byte) {
}
switch msgType {
case proto_consensus.START_CONSENSUS:
consensus.processStartConsensusMessage(payload)
case proto_consensus.COMMIT:
consensus.processCommitMessage(payload)
consensus.processCommitMessage(payload, CHALLENGE_DONE)
case proto_consensus.RESPONSE:
consensus.processResponseMessage(payload)
case proto_consensus.START_CONSENSUS:
consensus.processStartConsensusMessage(payload)
case proto_consensus.FINAL_COMMIT:
consensus.processCommitMessage(payload, FINAL_CHALLENGE_DONE)
default:
consensus.Log.Error("Unexpected message type", "msgType", msgType, "consensus", consensus)
}
@ -81,16 +84,25 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) {
p2p.BroadcastMessage(consensus.getValidatorPeers(), msgToSend)
// Set state to ANNOUNCE_DONE
consensus.state = ANNOUNCE_DONE
consensus.commitByLeader(true)
}
// Leader commit to the message itself before receiving others commits
func (consensus *Consensus) commitByLeader(firstRound bool) {
// Generate leader's own commitment
secret, commitment := crypto.Commit(crypto.Ed25519Curve)
consensus.secret = secret
consensus.commitments[consensus.nodeId] = commitment
if firstRound {
(*consensus.commitments)[consensus.nodeId] = commitment
consensus.bitmap.SetKey(consensus.pubKey, true)
} else {
(*consensus.finalCommitments)[consensus.nodeId] = commitment
consensus.finalBitmap.SetKey(consensus.pubKey, true)
}
}
// Processes the commit message sent from validators
func (consensus *Consensus) processCommitMessage(payload []byte) {
func (consensus *Consensus) processCommitMessage(payload []byte, targetState ConsensusState) {
// Read payload data
offset := 0
// 4 byte consensus id
@ -137,34 +149,43 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
return
}
commitments := consensus.commitments // targetState == CHALLENGE_DONE
bitmap := consensus.bitmap
if targetState == FINAL_CHALLENGE_DONE {
commitments = consensus.finalCommitments
bitmap = consensus.finalBitmap
}
// proceed only when the message is not received before
_, ok = consensus.commitments[validatorId]
_, ok = (*commitments)[validatorId]
shouldProcess := !ok
if shouldProcess {
point := crypto.Ed25519Curve.Point()
point.UnmarshalBinary(commitment)
consensus.commitments[validatorId] = point
(*commitments)[validatorId] = point
// Set the bitmap indicate this validate signed. TODO: figure out how to resolve the inconsistency of validators from commit and response messages
consensus.bitmap.SetKey(value.PubKey, true)
bitmap.SetKey(value.PubKey, true)
}
if !shouldProcess {
return
}
if len(consensus.commitments) >= len(consensus.validators)+1 && consensus.state < CHALLENGE_DONE {
consensus.Log.Debug("Enough commitments received with signatures", "num", len(consensus.commitments))
if len((*commitments)) >= len(consensus.publicKeys) && consensus.state < targetState {
consensus.Log.Debug("Enough commitments received with signatures", "num", len((*commitments)), "state", consensus.state)
// Broadcast challenge
msgToSend := consensus.constructChallengeMessage(proto_consensus.CHALLENGE)
msgTypeToSend := proto_consensus.CHALLENGE // targetState == CHALLENGE_DONE
if targetState == FINAL_CHALLENGE_DONE {
msgTypeToSend = proto_consensus.FINAL_CHALLENGE
}
msgToSend, challengeScalar := consensus.constructChallengeMessage(msgTypeToSend)
// Add leader's response
challengeScalar := crypto.Ed25519Curve.Scalar()
challengeScalar.UnmarshalBinary(consensus.challenge[:])
response, err := crypto.Response(crypto.Ed25519Curve, consensus.priKey, consensus.secret, challengeScalar)
if err == nil {
consensus.responses[consensus.nodeId] = response
consensus.bitmap.SetKey(consensus.pubKey, true)
bitmap.SetKey(consensus.pubKey, true)
} else {
log.Warn("Failed to generate response", "err", err)
}
@ -172,8 +193,8 @@ func (consensus *Consensus) processCommitMessage(payload []byte) {
// Broadcast challenge message
p2p.BroadcastMessage(consensus.getValidatorPeers(), msgToSend)
// Set state to CHALLENGE_DONE
consensus.state = CHALLENGE_DONE
// Set state to targetState (CHALLENGE_DONE or FINAL_CHALLENGE_DONE)
consensus.state = targetState
}
}
@ -230,12 +251,20 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
_, ok = consensus.responses[validatorId]
shouldProcess = shouldProcess && !ok
if shouldProcess {
scalar := crypto.Ed25519Curve.Scalar()
scalar.UnmarshalBinary(response)
consensus.responses[validatorId] = scalar
// verify the response matches the received commit
responseScalar := crypto.Ed25519Curve.Scalar()
responseScalar.UnmarshalBinary(response)
err := consensus.verifyResponse(responseScalar, validatorId)
if err != nil {
consensus.Log.Warn("Failed to verify the response", "error", err)
shouldProcess = false
} else {
consensus.responses[validatorId] = responseScalar
// Set the bitmap indicate this validate signed. TODO: figure out how to resolve the inconsistency of validators from commit and response messages
consensus.bitmap.SetKey(value.PubKey, true)
}
}
consensus.mutex.Unlock()
if !shouldProcess {
@ -243,9 +272,9 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
}
//consensus.Log.Debug("RECEIVED RESPONSE", "consensusId", consensusId)
if len(consensus.responses) >= len(consensus.validators)+1 && consensus.state != FINISHED {
if len(consensus.responses) >= len(consensus.publicKeys) && consensus.state != FINISHED {
consensus.mutex.Lock()
if len(consensus.responses) >= len(consensus.validators)+1 && consensus.state != FINISHED {
if len(consensus.responses) >= len(consensus.publicKeys) && consensus.state != FINISHED {
consensus.Log.Debug("Enough responses received with signatures", "num", len(consensus.responses))
// Aggregate responses
responses := []kyber.Scalar{}
@ -277,43 +306,67 @@ func (consensus *Consensus) processResponseMessage(payload []byte) {
// Set state to CHALLENGE_DONE
consensus.state = COLLECTIVE_SIG_DONE
consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(consensus.responses))
// Reset state to FINISHED, and clear other data.
consensus.ResetState()
consensus.consensusId++
consensus.Log.Debug("HOORAY!!! CONSENSUS REACHED!!!", "consensusId", consensus.consensusId)
// TODO: reconstruct the whole block from header and transactions
// For now, we used the stored whole block already stored in consensus.blockHeader
txDecoder := gob.NewDecoder(bytes.NewReader(consensus.blockHeader))
var blockHeaderObj blockchain.Block
err = txDecoder.Decode(&blockHeaderObj)
if err != nil {
consensus.Log.Debug("failed to construct the new block after consensus")
consensus.commitByLeader(false)
//consensus.Log.Debug("Consensus reached with signatures.", "numOfSignatures", len(consensus.responses))
//// Reset state to FINISHED, and clear other data.
//consensus.ResetState()
//consensus.consensusId++
//consensus.Log.Debug("HOORAY!!! CONSENSUS REACHED!!!", "consensusId", consensus.consensusId)
//
//// TODO: reconstruct the whole block from header and transactions
//// For now, we used the stored whole block already stored in consensus.blockHeader
//txDecoder := gob.NewDecoder(bytes.NewReader(consensus.blockHeader))
//var blockHeaderObj blockchain.Block
//err = txDecoder.Decode(&blockHeaderObj)
//if err != nil {
// consensus.Log.Debug("failed to construct the new block after consensus")
//}
//
//// Sign the block
//// TODO(RJ): populate bitmap
//copy(blockHeaderObj.Signature[:], collectiveSig[:])
//copy(blockHeaderObj.Bitmap[:], bitmap)
//consensus.OnConsensusDone(&blockHeaderObj)
//
//// TODO: @ricl these logic are irrelevant to consensus, move them to another file, say profiler.
//endTime := time.Now()
//timeElapsed := endTime.Sub(startTime)
//numOfTxs := blockHeaderObj.NumTransactions
//consensus.Log.Info("TPS Report",
// "numOfTXs", numOfTxs,
// "startTime", startTime,
// "endTime", endTime,
// "timeElapsed", timeElapsed,
// "TPS", float64(numOfTxs)/timeElapsed.Seconds(),
// "consensus", consensus)
//
//// Send signal to Node so the new block can be added and new round of consensus can be triggered
//consensus.ReadySignal <- 1
}
consensus.mutex.Unlock()
}
}
// Sign the block
// TODO(RJ): populate bitmap
copy(blockHeaderObj.Signature[:], collectiveSig[:])
copy(blockHeaderObj.Bitmap[:], bitmap)
consensus.OnConsensusDone(&blockHeaderObj)
// TODO: @ricl these logic are irrelevant to consensus, move them to another file, say profiler.
endTime := time.Now()
timeElapsed := endTime.Sub(startTime)
numOfTxs := blockHeaderObj.NumTransactions
consensus.Log.Info("TPS Report",
"numOfTXs", numOfTxs,
"startTime", startTime,
"endTime", endTime,
"timeElapsed", timeElapsed,
"TPS", float64(numOfTxs)/timeElapsed.Seconds(),
"consensus", consensus)
// Send signal to Node so the new block can be added and new round of consensus can be triggered
consensus.ReadySignal <- 1
func (consensus *Consensus) verifyResponse(response kyber.Scalar, validatorId uint16) error {
if response.Equal(crypto.Ed25519Curve.Scalar()) {
return errors.New("response is zero valued")
}
consensus.mutex.Unlock()
_, ok := (*consensus.commitments)[validatorId]
if !ok {
return errors.New("no commit is received for the validator")
}
// TODO(RJ): enable the actual check
//challenge := crypto.Ed25519Curve.Scalar()
//challenge.UnmarshalBinary(consensus.challenge[:])
//
//// compute Q = sG + r*pubKey
//sG := crypto.Ed25519Curve.Point().Mul(response, nil)
//r_pubKey := crypto.Ed25519Curve.Point().Mul(challenge, consensus.validators[validatorId].PubKey)
//Q := crypto.Ed25519Curve.Point().Add(sG, r_pubKey)
//
//if !Q.Equal(commit) {
// return errors.New("recreated commit doesn't match the received one")
//}
return nil
}

@ -37,7 +37,7 @@ func (consensus *Consensus) constructAnnounceMessage() []byte {
}
// Construct the challenge message
func (consensus *Consensus) constructChallengeMessage(msgType proto_consensus.MessageType) []byte {
func (consensus *Consensus) constructChallengeMessage(msgTypeToSend proto_consensus.MessageType) ([]byte, kyber.Scalar) {
buffer := bytes.NewBuffer([]byte{})
// 4 byte consensus id
@ -53,28 +53,45 @@ func (consensus *Consensus) constructChallengeMessage(msgType proto_consensus.Me
binary.BigEndian.PutUint16(twoBytes, consensus.nodeId)
buffer.Write(twoBytes)
commitmentsMap := consensus.commitments // msgType == CHALLENGE
bitmap := consensus.bitmap
if msgTypeToSend == proto_consensus.FINAL_CHALLENGE {
commitmentsMap = consensus.finalCommitments
bitmap = consensus.finalBitmap
}
// 33 byte aggregated commit
commitments := make([]kyber.Point, 0)
for _, val := range consensus.commitments {
for _, val := range *commitmentsMap {
commitments = append(commitments, val)
}
aggCommitment, aggCommitmentBytes := getAggregatedCommit(commitments)
buffer.Write(aggCommitmentBytes)
// 33 byte aggregated key
buffer.Write(getAggregatedKey(consensus.bitmap))
buffer.Write(getAggregatedKey(bitmap))
// 32 byte challenge
challenge := getChallenge(aggCommitment, consensus.bitmap.AggregatePublic, buffer.Bytes()[:36])
buffer.Write(challenge) // message contains consensus id and block hash for now.
copy(consensus.challenge[:], challenge)
challengeScalar := getChallenge(aggCommitment, bitmap.AggregatePublic, buffer.Bytes()[:36])
bytes, err := challengeScalar.MarshalBinary()
if err != nil {
log.Error("Failed to serialize challenge")
}
buffer.Write(bytes)
if msgTypeToSend == proto_consensus.CHALLENGE {
copy(consensus.challenge[:], bytes)
consensus.aggregatedCommitment = aggCommitment
} else if msgTypeToSend == proto_consensus.FINAL_CHALLENGE {
copy(consensus.finalChallenge[:], bytes)
consensus.aggregatedFinalCommitment = aggCommitment
}
// 64 byte of signature on previous data
signature := consensus.signMessage(buffer.Bytes())
buffer.Write(signature)
return proto_consensus.ConstructConsensusMessage(msgType, buffer.Bytes())
return proto_consensus.ConstructConsensusMessage(msgTypeToSend, buffer.Bytes()), challengeScalar
}
// Construct the collective signature message
@ -124,14 +141,10 @@ func getAggregatedKey(bitmap *crypto.Mask) []byte {
return append(bytes[:], byte(0))
}
func getChallenge(aggCommitment, aggKey kyber.Point, message []byte) []byte {
func getChallenge(aggCommitment, aggKey kyber.Point, message []byte) kyber.Scalar {
challenge, err := crypto.Challenge(crypto.Ed25519Curve, aggCommitment, aggKey, message)
if err != nil {
log.Error("Failed to generate challenge")
}
bytes, err := challenge.MarshalBinary()
if err != nil {
log.Error("Failed to serialize challenge")
}
return bytes
return challenge
}

@ -37,12 +37,12 @@ func TestConstructChallengeMessage(test *testing.T) {
consensus := NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
consensus.blockHash = [32]byte{}
consensus.commitments[0] = leaderPubKey
consensus.commitments[1] = validatorPubKey
(*consensus.commitments)[0] = leaderPubKey
(*consensus.commitments)[1] = validatorPubKey
consensus.bitmap.SetKey(leaderPubKey, true)
consensus.bitmap.SetKey(validatorPubKey, true)
msg := consensus.constructChallengeMessage(consensus_proto.CHALLENGE)
msg, _ := consensus.constructChallengeMessage(consensus_proto.CHALLENGE)
if len(msg) != 1+1+1+4+32+2+33+33+32+64 {
test.Errorf("Annouce message is not constructed in the correct size: %d", len(msg))

@ -31,7 +31,9 @@ func (consensus *Consensus) ProcessMessageValidator(message []byte) {
case proto_consensus.ANNOUNCE:
consensus.processAnnounceMessage(payload)
case proto_consensus.CHALLENGE:
consensus.processChallengeMessage(payload)
consensus.processChallengeMessage(payload, RESPONSE_DONE)
case proto_consensus.FINAL_CHALLENGE:
consensus.processChallengeMessage(payload, FINAL_RESPONSE_DONE)
case proto_consensus.COLLECTIVE_SIG:
consensus.processCollectiveSigMessage(payload)
default:
@ -129,7 +131,7 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) {
}
// Processes the challenge message sent from the leader
func (consensus *Consensus) processChallengeMessage(payload []byte) {
func (consensus *Consensus) processChallengeMessage(payload []byte, targetState ConsensusState) {
//#### Read payload data
offset := 0
// 4 byte consensus id
@ -235,46 +237,50 @@ func (consensus *Consensus) processChallengeMessage(payload []byte) {
log.Warn("Failed to generate response", "err", err)
return
}
msgToSend := consensus.constructResponseMessage(proto_consensus.RESPONSE, response)
msgTypeToSend := proto_consensus.RESPONSE
if targetState == FINAL_RESPONSE_DONE {
msgTypeToSend = proto_consensus.FINAL_RESPONSE
}
msgToSend := consensus.constructResponseMessage(msgTypeToSend, response)
p2p.SendMessage(consensus.leader, msgToSend)
// Set state to RESPONSE_DONE
consensus.state = RESPONSE_DONE
// Set state to target state (RESPONSE_DONE, FINAL_RESPONSE_DONE)
consensus.state = targetState
// BIG TODO: the block catch up logic is basically a mock now. More checks need to be done to make it correct.
// The logic is to roll up to the latest blocks one by one to try catching up with the leader.
for {
val, ok := consensus.blocksReceived[consensus.consensusId]
if ok {
delete(consensus.blocksReceived, consensus.consensusId)
consensus.blockHash = [32]byte{}
consensus.consensusId++ // roll up one by one, until the next block is not received yet.
// TODO: think about when validators know about the consensus is reached.
// For now, the blockchain is updated right here.
// TODO: reconstruct the whole block from header and transactions
// For now, we used the stored whole block in consensus.blockHeader
txDecoder := gob.NewDecoder(bytes.NewReader(val.blockHeader))
var blockHeaderObj blockchain.Block
err := txDecoder.Decode(&blockHeaderObj)
if err != nil {
consensus.Log.Debug("failed to construct the new block after consensus")
}
// check block data (transactions
if !consensus.BlockVerifier(&blockHeaderObj) {
consensus.Log.Debug("[WARNING] Block content is not verified successfully", "consensusId", consensus.consensusId)
consensus.mutex.Unlock()
return
}
consensus.OnConsensusDone(&blockHeaderObj)
} else {
break
}
}
//for {
// val, ok := consensus.blocksReceived[consensus.consensusId]
// if ok {
// delete(consensus.blocksReceived, consensus.consensusId)
//
// consensus.blockHash = [32]byte{}
// consensus.consensusId++ // roll up one by one, until the next block is not received yet.
//
// // TODO: think about when validators know about the consensus is reached.
// // For now, the blockchain is updated right here.
//
// // TODO: reconstruct the whole block from header and transactions
// // For now, we used the stored whole block in consensus.blockHeader
// txDecoder := gob.NewDecoder(bytes.NewReader(val.blockHeader))
// var blockHeaderObj blockchain.Block
// err := txDecoder.Decode(&blockHeaderObj)
// if err != nil {
// consensus.Log.Debug("failed to construct the new block after consensus")
// }
// // check block data (transactions
// if !consensus.BlockVerifier(&blockHeaderObj) {
// consensus.Log.Debug("[WARNING] Block content is not verified successfully", "consensusId", consensus.consensusId)
// consensus.mutex.Unlock()
// return
// }
// consensus.OnConsensusDone(&blockHeaderObj)
// } else {
// break
// }
//
//}
consensus.mutex.Unlock()
}
@ -325,7 +331,7 @@ func (consensus *Consensus) processCollectiveSigMessage(payload []byte) {
}
// Verify collective signature
err := crypto.Verify(crypto.Ed25519Curve, consensus.publicKeys, payload[:36], append(collectiveSig, bitmap...), crypto.NewThresholdPolicy(len(consensus.publicKeys)/2))
err := crypto.Verify(crypto.Ed25519Curve, consensus.publicKeys, payload[:36], append(collectiveSig, bitmap...), crypto.NewThresholdPolicy((2*len(consensus.publicKeys)/3)+1))
if err != nil {
consensus.Log.Warn("Failed to verify the collective sig message", "consensusId", consensusId, "err", err)
}

@ -25,6 +25,8 @@ if [ -z "$config" ]; then
usage
fi
db_supported=$2
# Kill nodes if any
./kill_node.sh
@ -48,7 +50,11 @@ while IFS='' read -r line || [[ -n "$line" ]]; do
IFS=' ' read ip port mode shardId <<< $line
#echo $ip $port $mode
if [ "$mode" != "client" ]; then
if [ -z "$db_supported" ]; then
./bin/benchmark -ip $ip -port $port -config_file $config -log_folder $log_folder&
else
./bin/benchmark -ip $ip -port $port -config_file $config -log_folder $log_folder -db_supported 1&
fi
fi
done < $config

@ -9,6 +9,7 @@ import (
"github.com/simple-rules/harmony-benchmark/client"
"github.com/simple-rules/harmony-benchmark/consensus"
"github.com/simple-rules/harmony-benchmark/crypto/pki"
"github.com/simple-rules/harmony-benchmark/db"
"github.com/simple-rules/harmony-benchmark/log"
"github.com/simple-rules/harmony-benchmark/p2p"
)
@ -21,6 +22,7 @@ type Node struct {
pendingTransactions []*blockchain.Transaction // All the transactions received but not yet processed for Consensus
transactionInConsensus []*blockchain.Transaction // The transactions selected into the new block and under Consensus process
blockchain *blockchain.Blockchain // The blockchain for the shard where this node belongs
db *db.LDBDatabase // LevelDB to store blockchain.
UtxoPool *blockchain.UTXOPool // The corresponding UTXO pool of the current blockchain
CrossTxsInConsensus []*blockchain.CrossShardTxAndProof // The cross shard txs that is under consensus, the proof is not filled yet.
CrossTxsToReturn []*blockchain.CrossShardTxAndProof // The cross shard txs and proof that needs to be sent back to the user client.
@ -95,7 +97,7 @@ func (node *Node) countNumTransactionsInBlockchain() int {
}
// Create a new Node
func New(consensus *consensus.Consensus) *Node {
func New(consensus *consensus.Consensus, db *db.LDBDatabase) *Node {
node := Node{}
// Consensus and associated channel to communicate blocks
@ -116,5 +118,9 @@ func New(consensus *consensus.Consensus) *Node {
// Logger
node.log = node.Consensus.Log
// Initialize level db.
node.db = db
return &node
}

@ -5,6 +5,7 @@ import (
"encoding/gob"
"net"
"os"
"strconv"
"time"
"github.com/simple-rules/harmony-benchmark/blockchain"
@ -278,6 +279,11 @@ func (node *Node) PostConsensusProcessing(newBlock *blockchain.Block) {
func (node *Node) AddNewBlock(newBlock *blockchain.Block) {
// Add it to blockchain
node.blockchain.Blocks = append(node.blockchain.Blocks, newBlock)
// Store it into leveldb.
if node.db != nil {
node.log.Info("Writing new block into disk.")
newBlock.Write(node.db, strconv.Itoa(len(node.blockchain.Blocks)))
}
// Update UTXO pool
node.UtxoPool.Update(newBlock.Transactions)
// Clear transaction-in-Consensus list

@ -12,7 +12,7 @@ func TestNewNewNode(test *testing.T) {
validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := consensus.NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
node := New(consensus)
node := New(consensus, nil)
if node.Consensus == nil {
test.Error("Consensus is not initialized for the node")
}
@ -39,7 +39,7 @@ func TestCountNumTransactionsInBlockchain(test *testing.T) {
validator := p2p.Peer{Ip: "3", Port: "5"}
consensus := consensus.NewConsensus("1", "2", "0", []p2p.Peer{leader, validator}, leader)
node := New(consensus)
node := New(consensus, nil)
node.AddTestingAddresses(1000)
if node.countNumTransactionsInBlockchain() != 1001 {
test.Error("Count of transactions in the blockchain is incorrect")

Loading…
Cancel
Save