Optimize txgen to send txs only to affected shards

pull/69/head
Rongjian Lan 6 years ago
parent f6963f44af
commit 45768265a5
  1. 21
      client/btctxgen/main.go
  2. 52
      client/client.go
  3. 10
      client/config/config.go
  4. 72
      client/txgen/main.go
  5. 8
      client/wallet/main.go
  6. 2
      proto/node/node.go

@ -137,12 +137,12 @@ LOOP:
return txs, crossTxs
}
func initClient(clientNode *node.Node, clientPort string, leaders *[]p2p.Peer, nodes *[]*node.Node) {
func initClient(clientNode *node.Node, clientPort string, shardIdLeaderMap *map[uint32]p2p.Peer, nodes *[]*node.Node) {
if clientPort == "" {
return
}
clientNode.Client = client.NewClient(leaders)
clientNode.Client = client.NewClient(shardIdLeaderMap)
// This func is used to update the client's utxopool when new blocks are received from the leaders
updateBlocksFunc := func(blocks []*blockchain.Block) {
@ -178,10 +178,10 @@ func main() {
// Read the configs
config := client_config.NewConfig()
config.ReadConfigFile(*configFile)
leaders, shardIDs := config.GetLeadersAndShardIds()
shardIdLeaderMap := config.GetShardIdToLeaderMap()
// Do cross shard tx if there are more than one shard
setting.crossShard = len(shardIDs) > 1
setting.crossShard = len(shardIdLeaderMap) > 1
setting.maxNumTxsPerBatch = *maxNumTxsPerBatch
// TODO(Richard): refactor this chuck to a single method
@ -199,7 +199,7 @@ func main() {
// Nodes containing utxopools to mirror the shards' data in the network
nodes := []*node.Node{}
for _, shardID := range shardIDs {
for shardID, _ := range shardIdLeaderMap {
nodes = append(nodes, node.New(&consensus.Consensus{ShardID: shardID}, nil))
}
@ -208,13 +208,18 @@ func main() {
consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{})
clientNode := node.New(consensusObj, nil)
initClient(clientNode, clientPort, &leaders, &nodes)
initClient(clientNode, clientPort, &shardIdLeaderMap, &nodes)
// Transaction generation process
time.Sleep(10 * time.Second) // wait for nodes to be ready
start := time.Now()
totalTime := 300.0 //run for 5 minutes
leaders := []p2p.Peer{}
for _, leader := range shardIdLeaderMap {
leaders = append(leaders, leader)
}
for true {
t := time.Now()
if t.Sub(start).Seconds() >= totalTime {
@ -224,8 +229,8 @@ func main() {
allCrossTxs := []*blockchain.Transaction{}
// Generate simulated transactions
for i, leader := range leaders {
txs, crossTxs := generateSimulatedTransactions(i, nodes)
for shardId, leader := range shardIdLeaderMap {
txs, crossTxs := generateSimulatedTransactions(int(shardId), nodes)
allCrossTxs = append(allCrossTxs, crossTxs...)
log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs), "block height", btcTXIter.GetBlockIndex())

@ -16,7 +16,7 @@ import (
type Client struct {
PendingCrossTxs map[[32]byte]*blockchain.Transaction // Map of TxId to pending cross shard txs. Pending means the proof-of-accept/rejects are not complete
PendingCrossTxsMutex sync.Mutex // Mutex for the pending txs list
Leaders *[]p2p.Peer // All the leaders for each shard
Leaders *map[uint32]p2p.Peer // Map of shard Id and corresponding leader
UpdateBlocks func([]*blockchain.Block) // Closure function used to sync new block with the leader. Once the leader finishes the consensus on a new block, it will send it to the clients. Clients use this method to update their blockchain
ShardUtxoMap map[uint32]blockchain.UtxoMap
@ -56,7 +56,7 @@ func (client *Client) TransactionMessageHandler(msgPayload []byte) {
// 3) checks whether all input utxos of the transaction have a corresponding proof.
// 4) for all transactions with full proofs, broadcast them back to the leaders
func (client *Client) handleProofOfLockMessage(proofs *[]blockchain.CrossShardTxProof) {
txsToSend := []blockchain.Transaction{}
txsToSend := []*blockchain.Transaction{}
//fmt.Printf("PENDING CLIENT TX - %d\n", len(client.PendingCrossTxs))
// Loop through the newly received list of proofs
@ -89,7 +89,7 @@ func (client *Client) handleProofOfLockMessage(proofs *[]blockchain.CrossShardTx
}
if readyToUnlock {
txsToSend = append(txsToSend, *txAndProofs)
txsToSend = append(txsToSend, txAndProofs)
}
}
@ -101,7 +101,7 @@ func (client *Client) handleProofOfLockMessage(proofs *[]blockchain.CrossShardTx
// Broadcast the cross txs with full proofs for unlock-to-commit/abort
if len(txsToSend) != 0 {
client.broadcastCrossShardTxUnlockMessage(&txsToSend)
client.sendCrossShardTxUnlockMessage(txsToSend)
}
}
@ -115,12 +115,14 @@ func (client *Client) handleFetchUtxoResponseMessage(utxoResponse client_proto.F
client.ShardUtxoMap[utxoResponse.ShardId] = utxoResponse.UtxoMap
}
func (client *Client) broadcastCrossShardTxUnlockMessage(txsToSend *[]blockchain.Transaction) {
p2p.BroadcastMessage(*client.Leaders, node.ConstructUnlockToCommitOrAbortMessage(*txsToSend))
func (client *Client) sendCrossShardTxUnlockMessage(txsToSend []*blockchain.Transaction) {
for shardId, txs := range BuildOutputShardTransactionMap(txsToSend) {
p2p.SendMessage((*client.Leaders)[shardId], node.ConstructUnlockToCommitOrAbortMessage(txs))
}
}
// Create a new Client
func NewClient(leaders *[]p2p.Peer) *Client {
func NewClient(leaders *map[uint32]p2p.Peer) *Client {
client := Client{}
client.PendingCrossTxs = make(map[[32]byte]*blockchain.Transaction)
client.Leaders = leaders
@ -129,3 +131,39 @@ func NewClient(leaders *[]p2p.Peer) *Client {
client.log = log.New()
return &client
}
func BuildOutputShardTransactionMap(txs []*blockchain.Transaction) map[uint32][]*blockchain.Transaction {
txsShardMap := make(map[uint32][]*blockchain.Transaction)
// Put txs into corresponding output shards
for _, crossTx := range txs {
for curShardId, _ := range GetOutputShardIdsOfCrossShardTx(crossTx) {
txsShardMap[curShardId] = append(txsShardMap[curShardId], crossTx)
}
}
return txsShardMap
}
func GetInputShardIdsOfCrossShardTx(crossTx *blockchain.Transaction) map[uint32]bool {
shardIds := map[uint32]bool{}
for _, txInput := range crossTx.TxInput {
shardIds[txInput.ShardID] = true
}
return shardIds
}
func GetOutputShardIdsOfCrossShardTx(crossTx *blockchain.Transaction) map[uint32]bool {
shardIds := map[uint32]bool{}
for _, txOutput := range crossTx.TxOutput {
shardIds[txOutput.ShardID] = true
}
return shardIds
}
func (client *Client) GetLeaders() []p2p.Peer {
leaders := []p2p.Peer{}
for _, leader := range *client.Leaders {
leaders = append(leaders, leader)
}
return leaders
}

@ -40,21 +40,19 @@ func (config *Config) GetValidators() []p2p.Peer {
}
// Gets all the leader peers and corresponding shard Ids
func (config *Config) GetLeadersAndShardIds() ([]p2p.Peer, []uint32) {
var peerList []p2p.Peer
var shardIDs []uint32
func (config *Config) GetShardIdToLeaderMap() map[uint32]p2p.Peer {
shardIdLeaderMap := map[uint32]p2p.Peer{}
for _, entry := range config.config {
if entry.Role == "leader" {
peerList = append(peerList, p2p.Peer{Ip: entry.IP, Port: entry.Port})
val, err := strconv.Atoi(entry.ShardID)
if err == nil {
shardIDs = append(shardIDs, uint32(val))
shardIdLeaderMap[uint32(val)] = p2p.Peer{Ip: entry.IP, Port: entry.Port}
} else {
log.Print("[Generator] Error parsing the shard Id ", entry.ShardID)
}
}
}
return peerList, shardIDs
return shardIdLeaderMap
}
func (config *Config) GetClientPeer() *p2p.Peer {

@ -254,11 +254,11 @@ func main() {
// Read the configs
config := client_config.NewConfig()
config.ReadConfigFile(*configFile)
leaders, shardIds := config.GetLeadersAndShardIds()
shardIdLeaderMap := config.GetShardIdToLeaderMap()
setting.numOfAddress = 10000
// Do cross shard tx if there are more than one shard
setting.crossShard = len(shardIds) > 1
setting.crossShard = len(shardIdLeaderMap) > 1
setting.maxNumTxsPerBatch = *maxNumTxsPerBatch
setting.crossShardRatio = *crossShardRatio
@ -273,7 +273,7 @@ func main() {
// Nodes containing utxopools to mirror the shards' data in the network
nodes := []*node.Node{}
for _, shardId := range shardIds {
for shardId, _ := range shardIdLeaderMap {
node := node.New(&consensus.Consensus{ShardID: shardId}, nil)
// Assign many fake addresses so we have enough address to play with at first
node.AddTestingAddresses(setting.numOfAddress)
@ -286,7 +286,7 @@ func main() {
clientNode := node.New(consensusObj, nil)
if clientPort != "" {
clientNode.Client = client.NewClient(&leaders)
clientNode.Client = client.NewClient(&shardIdLeaderMap)
// This func is used to update the client's utxopool when new blocks are received from the leaders
updateBlocksFunc := func(blocks []*blockchain.Block) {
@ -319,53 +319,51 @@ func main() {
start := time.Now()
totalTime := float64(*duration)
batchCounter := 0
subsetCounter := 0
for true {
t := time.Now()
if totalTime > 0 && t.Sub(start).Seconds() >= totalTime {
log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime)
break
}
for shardId := 0; shardId < len(nodes); shardId++ {
constructAndSendTransaction(batchCounter, *numSubset, shardId, leaders, nodes, clientNode, clientPort)
shardIdTxsMap := make(map[uint32][]*blockchain.Transaction)
for shardId, _ := range shardIdLeaderMap { // Generate simulated transactions
txs, crossTxs := generateSimulatedTransactions(subsetCounter, *numSubset, int(shardId), nodes)
// Put cross shard tx into a pending list waiting for proofs from leaders
if clientPort != "" {
clientNode.Client.PendingCrossTxsMutex.Lock()
for _, tx := range crossTxs {
clientNode.Client.PendingCrossTxs[tx.ID] = tx
}
clientNode.Client.PendingCrossTxsMutex.Unlock()
}
// Put txs into corresponding shards
shardIdTxsMap[shardId] = append(shardIdTxsMap[shardId], txs...)
for _, crossTx := range crossTxs {
for curShardId, _ := range client.GetInputShardIdsOfCrossShardTx(crossTx) {
shardIdTxsMap[curShardId] = append(shardIdTxsMap[curShardId], crossTx)
}
}
}
for shardId, txs := range shardIdTxsMap { // Send the txs to corresponding shards
SendTxsToLeader(shardIdLeaderMap[shardId], txs)
}
batchCounter++
time.Sleep(5000 * time.Millisecond)
subsetCounter++
time.Sleep(2000 * time.Millisecond)
}
// Send a stop message to stop the nodes at the end
msg := proto_node.ConstructStopMessage()
peers := append(config.GetValidators(), leaders...)
peers := append(config.GetValidators(), clientNode.Client.GetLeaders()...)
p2p.BroadcastMessage(peers, msg)
}
func constructAndSendTransaction(subsetId, numSubset, shardId int, leaders []p2p.Peer, nodes []*node.Node, clientNode *node.Node, clientPort string) {
allCrossTxs := []*blockchain.Transaction{}
// Generate simulated transactions
leader := leaders[shardId]
txs, crossTxs := generateSimulatedTransactions(subsetId, numSubset, shardId, nodes)
allCrossTxs = append(allCrossTxs, crossTxs...)
log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs))
func SendTxsToLeader(leader p2p.Peer, txs []*blockchain.Transaction) {
log.Debug("[Generator] Sending txs to...", "leader", leader, "numTxs", len(txs))
msg := proto_node.ConstructTransactionListMessage(txs)
p2p.SendMessage(leader, msg)
// Note cross shard txs are later sent in batch
if len(allCrossTxs) > 0 {
log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs))
//fmt.Printf("SENDING CLIENT TXS: %d\n", shardId)
//fmt.Println(allCrossTxs)
msg := proto_node.ConstructTransactionListMessage(allCrossTxs)
p2p.BroadcastMessage(leaders, msg)
// Put cross shard tx into a pending list waiting for proofs from leaders
if clientPort != "" {
clientNode.Client.PendingCrossTxsMutex.Lock()
for _, tx := range allCrossTxs {
clientNode.Client.PendingCrossTxs[tx.ID] = tx
}
clientNode.Client.PendingCrossTxsMutex.Unlock()
}
}
}

@ -237,10 +237,10 @@ func getLeaders() []p2p.Peer {
func CreateWalletServerNode() *node.Node {
configr := client_config.NewConfig()
configr.ReadConfigFile("local_config_shards.txt")
leaders, _ := configr.GetLeadersAndShardIds()
shardIdLeaderMap := configr.GetShardIdToLeaderMap()
clientPeer := configr.GetClientPeer()
walletNode := node.New(nil, nil)
walletNode.Client = client.NewClient(&leaders)
walletNode.Client = client.NewClient(&shardIdLeaderMap)
walletNode.ClientPeer = clientPeer
return walletNode
}
@ -254,7 +254,7 @@ func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error
}
msg := proto_node.ConstructTransactionListMessage([]*blockchain.Transaction{&tx})
p2p.BroadcastMessage(*walletNode.Client.Leaders, msg)
p2p.BroadcastMessage(walletNode.Client.GetLeaders(), msg)
doneSignal := make(chan int)
go func() {
@ -279,7 +279,7 @@ func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error
func FetchUtxos(addresses [][20]byte, walletNode *node.Node) (map[uint32]blockchain.UtxoMap, error) {
fmt.Println("Fetching account balance...")
walletNode.Client.ShardUtxoMap = make(map[uint32]blockchain.UtxoMap)
p2p.BroadcastMessage(*walletNode.Client.Leaders, proto_node.ConstructFetchUtxoMessage(*walletNode.ClientPeer, addresses))
p2p.BroadcastMessage(walletNode.Client.GetLeaders(), proto_node.ConstructFetchUtxoMessage(*walletNode.ClientPeer, addresses))
doneSignal := make(chan int)
go func() {

@ -58,7 +58,7 @@ type FetchUtxoMessage struct {
}
// [client] Constructs the unlock to commit or abort message that will be sent to leaders
func ConstructUnlockToCommitOrAbortMessage(txsAndProofs []blockchain.Transaction) []byte {
func ConstructUnlockToCommitOrAbortMessage(txsAndProofs []*blockchain.Transaction) []byte {
byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)})
byteBuffer.WriteByte(byte(TRANSACTION))
byteBuffer.WriteByte(byte(UNLOCK))

Loading…
Cancel
Save