Merge branch 'master' into sync

pull/69/head
Minh Doan 6 years ago
commit f68652b74e
  1. 4
      blockchain/utxopool.go
  2. 21
      client/btctxgen/main.go
  3. 54
      client/client.go
  4. 10
      client/config/config.go
  5. 72
      client/txgen/main.go
  6. 8
      client/wallet/main.go
  7. 4
      consensus/consensus_leader.go
  8. 3
      consensus/consensus_validator.go
  9. 2
      proto/node/node.go

@ -9,6 +9,7 @@ import (
"github.com/dedis/kyber/sign/schnorr"
"github.com/simple-rules/harmony-benchmark/crypto"
"github.com/simple-rules/harmony-benchmark/log"
"math/rand"
"sync"
)
@ -408,6 +409,9 @@ func (utxoPool *UTXOPool) SelectTransactionsForNewBlock(transactions []*Transact
err, crossShard := utxoPool.VerifyOneTransaction(tx, &spentTXOs)
if len(selected) < maxNumTxs {
if err != nil && rand.Intn(10) < 1 {
log.Warn("Invalid Transaction", "Reason", err)
}
if err == nil || crossShard {
if crossShard {
proof := CrossShardTxProof{Accept: err == nil, TxID: tx.ID, TxInput: getShardTxInput(tx, utxoPool.ShardID)}

@ -137,12 +137,12 @@ LOOP:
return txs, crossTxs
}
func initClient(clientNode *node.Node, clientPort string, leaders *[]p2p.Peer, nodes *[]*node.Node) {
func initClient(clientNode *node.Node, clientPort string, shardIdLeaderMap *map[uint32]p2p.Peer, nodes *[]*node.Node) {
if clientPort == "" {
return
}
clientNode.Client = client.NewClient(leaders)
clientNode.Client = client.NewClient(shardIdLeaderMap)
// This func is used to update the client's utxopool when new blocks are received from the leaders
updateBlocksFunc := func(blocks []*blockchain.Block) {
@ -178,10 +178,10 @@ func main() {
// Read the configs
config := client_config.NewConfig()
config.ReadConfigFile(*configFile)
leaders, shardIDs := config.GetLeadersAndShardIds()
shardIdLeaderMap := config.GetShardIdToLeaderMap()
// Do cross shard tx if there are more than one shard
setting.crossShard = len(shardIDs) > 1
setting.crossShard = len(shardIdLeaderMap) > 1
setting.maxNumTxsPerBatch = *maxNumTxsPerBatch
// TODO(Richard): refactor this chuck to a single method
@ -199,7 +199,7 @@ func main() {
// Nodes containing utxopools to mirror the shards' data in the network
nodes := []*node.Node{}
for _, shardID := range shardIDs {
for shardID, _ := range shardIdLeaderMap {
nodes = append(nodes, node.New(&consensus.Consensus{ShardID: shardID}, nil))
}
@ -208,13 +208,18 @@ func main() {
consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{})
clientNode := node.New(consensusObj, nil)
initClient(clientNode, clientPort, &leaders, &nodes)
initClient(clientNode, clientPort, &shardIdLeaderMap, &nodes)
// Transaction generation process
time.Sleep(10 * time.Second) // wait for nodes to be ready
start := time.Now()
totalTime := 300.0 //run for 5 minutes
leaders := []p2p.Peer{}
for _, leader := range shardIdLeaderMap {
leaders = append(leaders, leader)
}
for true {
t := time.Now()
if t.Sub(start).Seconds() >= totalTime {
@ -224,8 +229,8 @@ func main() {
allCrossTxs := []*blockchain.Transaction{}
// Generate simulated transactions
for i, leader := range leaders {
txs, crossTxs := generateSimulatedTransactions(i, nodes)
for shardId, leader := range shardIdLeaderMap {
txs, crossTxs := generateSimulatedTransactions(int(shardId), nodes)
allCrossTxs = append(allCrossTxs, crossTxs...)
log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs), "block height", btcTXIter.GetBlockIndex())

@ -16,7 +16,7 @@ import (
type Client struct {
PendingCrossTxs map[[32]byte]*blockchain.Transaction // Map of TxId to pending cross shard txs. Pending means the proof-of-accept/rejects are not complete
PendingCrossTxsMutex sync.Mutex // Mutex for the pending txs list
Leaders *[]p2p.Peer // All the leaders for each shard
Leaders *map[uint32]p2p.Peer // Map of shard Id and corresponding leader
UpdateBlocks func([]*blockchain.Block) // Closure function used to sync new block with the leader. Once the leader finishes the consensus on a new block, it will send it to the clients. Clients use this method to update their blockchain
ShardUtxoMap map[uint32]blockchain.UtxoMap
@ -56,12 +56,12 @@ func (client *Client) TransactionMessageHandler(msgPayload []byte) {
// 3) checks whether all input utxos of the transaction have a corresponding proof.
// 4) for all transactions with full proofs, broadcast them back to the leaders
func (client *Client) handleProofOfLockMessage(proofs *[]blockchain.CrossShardTxProof) {
txsToSend := []blockchain.Transaction{}
txsToSend := []*blockchain.Transaction{}
//fmt.Printf("PENDING CLIENT TX - %d\n", len(client.PendingCrossTxs))
// Loop through the newly received list of proofs
client.PendingCrossTxsMutex.Lock()
log.Info("CLIENT PENDING TX", "num", len(client.PendingCrossTxs))
log.Info("CLIENT PENDING CROSS TX", "num", len(client.PendingCrossTxs))
for _, proof := range *proofs {
// Find the corresponding pending cross tx
txAndProofs, ok := client.PendingCrossTxs[proof.TxID]
@ -89,7 +89,7 @@ func (client *Client) handleProofOfLockMessage(proofs *[]blockchain.CrossShardTx
}
if readyToUnlock {
txsToSend = append(txsToSend, *txAndProofs)
txsToSend = append(txsToSend, txAndProofs)
}
}
@ -101,7 +101,7 @@ func (client *Client) handleProofOfLockMessage(proofs *[]blockchain.CrossShardTx
// Broadcast the cross txs with full proofs for unlock-to-commit/abort
if len(txsToSend) != 0 {
client.broadcastCrossShardTxUnlockMessage(&txsToSend)
client.sendCrossShardTxUnlockMessage(txsToSend)
}
}
@ -115,12 +115,14 @@ func (client *Client) handleFetchUtxoResponseMessage(utxoResponse client_proto.F
client.ShardUtxoMap[utxoResponse.ShardId] = utxoResponse.UtxoMap
}
func (client *Client) broadcastCrossShardTxUnlockMessage(txsToSend *[]blockchain.Transaction) {
p2p.BroadcastMessage(*client.Leaders, node.ConstructUnlockToCommitOrAbortMessage(*txsToSend))
func (client *Client) sendCrossShardTxUnlockMessage(txsToSend []*blockchain.Transaction) {
for shardId, txs := range BuildOutputShardTransactionMap(txsToSend) {
p2p.SendMessage((*client.Leaders)[shardId], node.ConstructUnlockToCommitOrAbortMessage(txs))
}
}
// Create a new Client
func NewClient(leaders *[]p2p.Peer) *Client {
func NewClient(leaders *map[uint32]p2p.Peer) *Client {
client := Client{}
client.PendingCrossTxs = make(map[[32]byte]*blockchain.Transaction)
client.Leaders = leaders
@ -129,3 +131,39 @@ func NewClient(leaders *[]p2p.Peer) *Client {
client.log = log.New()
return &client
}
func BuildOutputShardTransactionMap(txs []*blockchain.Transaction) map[uint32][]*blockchain.Transaction {
txsShardMap := make(map[uint32][]*blockchain.Transaction)
// Put txs into corresponding output shards
for _, crossTx := range txs {
for curShardId, _ := range GetOutputShardIdsOfCrossShardTx(crossTx) {
txsShardMap[curShardId] = append(txsShardMap[curShardId], crossTx)
}
}
return txsShardMap
}
func GetInputShardIdsOfCrossShardTx(crossTx *blockchain.Transaction) map[uint32]bool {
shardIds := map[uint32]bool{}
for _, txInput := range crossTx.TxInput {
shardIds[txInput.ShardID] = true
}
return shardIds
}
func GetOutputShardIdsOfCrossShardTx(crossTx *blockchain.Transaction) map[uint32]bool {
shardIds := map[uint32]bool{}
for _, txOutput := range crossTx.TxOutput {
shardIds[txOutput.ShardID] = true
}
return shardIds
}
func (client *Client) GetLeaders() []p2p.Peer {
leaders := []p2p.Peer{}
for _, leader := range *client.Leaders {
leaders = append(leaders, leader)
}
return leaders
}

@ -40,21 +40,19 @@ func (config *Config) GetValidators() []p2p.Peer {
}
// Gets all the leader peers and corresponding shard Ids
func (config *Config) GetLeadersAndShardIds() ([]p2p.Peer, []uint32) {
var peerList []p2p.Peer
var shardIDs []uint32
func (config *Config) GetShardIdToLeaderMap() map[uint32]p2p.Peer {
shardIdLeaderMap := map[uint32]p2p.Peer{}
for _, entry := range config.config {
if entry.Role == "leader" {
peerList = append(peerList, p2p.Peer{Ip: entry.IP, Port: entry.Port})
val, err := strconv.Atoi(entry.ShardID)
if err == nil {
shardIDs = append(shardIDs, uint32(val))
shardIdLeaderMap[uint32(val)] = p2p.Peer{Ip: entry.IP, Port: entry.Port}
} else {
log.Print("[Generator] Error parsing the shard Id ", entry.ShardID)
}
}
}
return peerList, shardIDs
return shardIdLeaderMap
}
func (config *Config) GetClientPeer() *p2p.Peer {

@ -254,11 +254,11 @@ func main() {
// Read the configs
config := client_config.NewConfig()
config.ReadConfigFile(*configFile)
leaders, shardIds := config.GetLeadersAndShardIds()
shardIdLeaderMap := config.GetShardIdToLeaderMap()
setting.numOfAddress = 10000
// Do cross shard tx if there are more than one shard
setting.crossShard = len(shardIds) > 1
setting.crossShard = len(shardIdLeaderMap) > 1
setting.maxNumTxsPerBatch = *maxNumTxsPerBatch
setting.crossShardRatio = *crossShardRatio
@ -273,7 +273,7 @@ func main() {
// Nodes containing utxopools to mirror the shards' data in the network
nodes := []*node.Node{}
for _, shardId := range shardIds {
for shardId, _ := range shardIdLeaderMap {
node := node.New(&consensus.Consensus{ShardID: shardId}, nil)
// Assign many fake addresses so we have enough address to play with at first
node.AddTestingAddresses(setting.numOfAddress)
@ -286,7 +286,7 @@ func main() {
clientNode := node.New(consensusObj, nil)
if clientPort != "" {
clientNode.Client = client.NewClient(&leaders)
clientNode.Client = client.NewClient(&shardIdLeaderMap)
// This func is used to update the client's utxopool when new blocks are received from the leaders
updateBlocksFunc := func(blocks []*blockchain.Block) {
@ -319,53 +319,51 @@ func main() {
start := time.Now()
totalTime := float64(*duration)
batchCounter := 0
subsetCounter := 0
for true {
t := time.Now()
if totalTime > 0 && t.Sub(start).Seconds() >= totalTime {
log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime)
break
}
for shardId := 0; shardId < len(nodes); shardId++ {
constructAndSendTransaction(batchCounter, *numSubset, shardId, leaders, nodes, clientNode, clientPort)
shardIdTxsMap := make(map[uint32][]*blockchain.Transaction)
for shardId, _ := range shardIdLeaderMap { // Generate simulated transactions
txs, crossTxs := generateSimulatedTransactions(subsetCounter, *numSubset, int(shardId), nodes)
// Put cross shard tx into a pending list waiting for proofs from leaders
if clientPort != "" {
clientNode.Client.PendingCrossTxsMutex.Lock()
for _, tx := range crossTxs {
clientNode.Client.PendingCrossTxs[tx.ID] = tx
}
clientNode.Client.PendingCrossTxsMutex.Unlock()
}
// Put txs into corresponding shards
shardIdTxsMap[shardId] = append(shardIdTxsMap[shardId], txs...)
for _, crossTx := range crossTxs {
for curShardId, _ := range client.GetInputShardIdsOfCrossShardTx(crossTx) {
shardIdTxsMap[curShardId] = append(shardIdTxsMap[curShardId], crossTx)
}
}
}
for shardId, txs := range shardIdTxsMap { // Send the txs to corresponding shards
SendTxsToLeader(shardIdLeaderMap[shardId], txs)
}
batchCounter++
time.Sleep(5000 * time.Millisecond)
subsetCounter++
time.Sleep(2000 * time.Millisecond)
}
// Send a stop message to stop the nodes at the end
msg := proto_node.ConstructStopMessage()
peers := append(config.GetValidators(), leaders...)
peers := append(config.GetValidators(), clientNode.Client.GetLeaders()...)
p2p.BroadcastMessage(peers, msg)
}
func constructAndSendTransaction(subsetId, numSubset, shardId int, leaders []p2p.Peer, nodes []*node.Node, clientNode *node.Node, clientPort string) {
allCrossTxs := []*blockchain.Transaction{}
// Generate simulated transactions
leader := leaders[shardId]
txs, crossTxs := generateSimulatedTransactions(subsetId, numSubset, shardId, nodes)
allCrossTxs = append(allCrossTxs, crossTxs...)
log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs))
func SendTxsToLeader(leader p2p.Peer, txs []*blockchain.Transaction) {
log.Debug("[Generator] Sending txs to...", "leader", leader, "numTxs", len(txs))
msg := proto_node.ConstructTransactionListMessage(txs)
p2p.SendMessage(leader, msg)
// Note cross shard txs are later sent in batch
if len(allCrossTxs) > 0 {
log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs))
//fmt.Printf("SENDING CLIENT TXS: %d\n", shardId)
//fmt.Println(allCrossTxs)
msg := proto_node.ConstructTransactionListMessage(allCrossTxs)
p2p.BroadcastMessage(leaders, msg)
// Put cross shard tx into a pending list waiting for proofs from leaders
if clientPort != "" {
clientNode.Client.PendingCrossTxsMutex.Lock()
for _, tx := range allCrossTxs {
clientNode.Client.PendingCrossTxs[tx.ID] = tx
}
clientNode.Client.PendingCrossTxsMutex.Unlock()
}
}
}

@ -237,10 +237,10 @@ func getLeaders() []p2p.Peer {
func CreateWalletServerNode() *node.Node {
configr := client_config.NewConfig()
configr.ReadConfigFile("local_config_shards.txt")
leaders, _ := configr.GetLeadersAndShardIds()
shardIdLeaderMap := configr.GetShardIdToLeaderMap()
clientPeer := configr.GetClientPeer()
walletNode := node.New(nil, nil)
walletNode.Client = client.NewClient(&leaders)
walletNode.Client = client.NewClient(&shardIdLeaderMap)
walletNode.ClientPeer = clientPeer
return walletNode
}
@ -254,7 +254,7 @@ func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error
}
msg := proto_node.ConstructTransactionListMessage([]*blockchain.Transaction{&tx})
p2p.BroadcastMessage(*walletNode.Client.Leaders, msg)
p2p.BroadcastMessage(walletNode.Client.GetLeaders(), msg)
doneSignal := make(chan int)
go func() {
@ -279,7 +279,7 @@ func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error
func FetchUtxos(addresses [][20]byte, walletNode *node.Node) (map[uint32]blockchain.UtxoMap, error) {
fmt.Println("Fetching account balance...")
walletNode.Client.ShardUtxoMap = make(map[uint32]blockchain.UtxoMap)
p2p.BroadcastMessage(*walletNode.Client.Leaders, proto_node.ConstructFetchUtxoMessage(*walletNode.ClientPeer, addresses))
p2p.BroadcastMessage(walletNode.Client.GetLeaders(), proto_node.ConstructFetchUtxoMessage(*walletNode.ClientPeer, addresses))
doneSignal := make(chan int)
go func() {

@ -168,6 +168,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Con
point := crypto.Ed25519Curve.Point()
point.UnmarshalBinary(commitment)
(*commitments)[validatorId] = point
consensus.Log.Debug("Received new commit message", "num", len(*commitments))
// Set the bitmap indicate this validate signed. TODO: figure out how to resolve the inconsistency of validators from commit and response messages
bitmap.SetKey(value.PubKey, true)
}
@ -177,7 +178,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Con
}
if len((*commitments)) >= len(consensus.publicKeys) && consensus.state < targetState {
consensus.Log.Debug("Enough commitments received with signatures", "num", len((*commitments)), "state", consensus.state)
consensus.Log.Debug("Enough commitments received with signatures", "num", len(*commitments), "state", consensus.state)
// Broadcast challenge
msgTypeToSend := proto_consensus.CHALLENGE // targetState == CHALLENGE_DONE
@ -297,6 +298,7 @@ func (consensus *Consensus) processResponseMessage(payload []byte, targetState C
shouldProcess = false
} else {
(*responses)[validatorId] = responseScalar
consensus.Log.Debug("Received new response message", "num", len(*responses))
// Set the bitmap indicate this validate signed. TODO: figure out how to resolve the inconsistency of validators from commit and response messages
consensus.bitmap.SetKey(value.PubKey, true)
}

@ -27,6 +27,7 @@ func (consensus *Consensus) ProcessMessageValidator(message []byte) {
consensus.Log.Error("Failed to get consensus message payload", "err", err, "consensus", consensus)
}
consensus.Log.Info("Received consensus Message", "type", msgType)
switch msgType {
case proto_consensus.ANNOUNCE:
consensus.processAnnounceMessage(payload)
@ -68,8 +69,6 @@ func (consensus *Consensus) processAnnounceMessage(payload []byte) {
offset += 64
//#### END: Read payload data
consensus.Log.Info("Received Announce Message", "LeaderId", leaderId)
copy(consensus.blockHash[:], blockHash[:])
// Verify block data

@ -71,7 +71,7 @@ type FetchUtxoMessage struct {
}
// [client] Constructs the unlock to commit or abort message that will be sent to leaders
func ConstructUnlockToCommitOrAbortMessage(txsAndProofs []blockchain.Transaction) []byte {
func ConstructUnlockToCommitOrAbortMessage(txsAndProofs []*blockchain.Transaction) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)})
byteBuffer.WriteByte(byte(TRANSACTION))
byteBuffer.WriteByte(byte(UNLOCK))

Loading…
Cancel
Save