diff --git a/client/btctxgen/main.go b/client/btctxgen/main.go index 0aae65b44..0c23a5d4b 100644 --- a/client/btctxgen/main.go +++ b/client/btctxgen/main.go @@ -137,12 +137,12 @@ LOOP: return txs, crossTxs } -func initClient(clientNode *node.Node, clientPort string, leaders *[]p2p.Peer, nodes *[]*node.Node) { +func initClient(clientNode *node.Node, clientPort string, shardIdLeaderMap *map[uint32]p2p.Peer, nodes *[]*node.Node) { if clientPort == "" { return } - clientNode.Client = client.NewClient(leaders) + clientNode.Client = client.NewClient(shardIdLeaderMap) // This func is used to update the client's utxopool when new blocks are received from the leaders updateBlocksFunc := func(blocks []*blockchain.Block) { @@ -178,10 +178,10 @@ func main() { // Read the configs config := client_config.NewConfig() config.ReadConfigFile(*configFile) - leaders, shardIDs := config.GetLeadersAndShardIds() + shardIdLeaderMap := config.GetShardIdToLeaderMap() // Do cross shard tx if there are more than one shard - setting.crossShard = len(shardIDs) > 1 + setting.crossShard = len(shardIdLeaderMap) > 1 setting.maxNumTxsPerBatch = *maxNumTxsPerBatch // TODO(Richard): refactor this chuck to a single method @@ -199,7 +199,7 @@ func main() { // Nodes containing utxopools to mirror the shards' data in the network nodes := []*node.Node{} - for _, shardID := range shardIDs { + for shardID, _ := range shardIdLeaderMap { nodes = append(nodes, node.New(&consensus.Consensus{ShardID: shardID}, nil)) } @@ -208,13 +208,18 @@ func main() { consensusObj := consensus.NewConsensus("0", clientPort, "0", nil, p2p.Peer{}) clientNode := node.New(consensusObj, nil) - initClient(clientNode, clientPort, &leaders, &nodes) + initClient(clientNode, clientPort, &shardIdLeaderMap, &nodes) // Transaction generation process time.Sleep(10 * time.Second) // wait for nodes to be ready start := time.Now() totalTime := 300.0 //run for 5 minutes + leaders := []p2p.Peer{} + for _, leader := range shardIdLeaderMap { + leaders = append(leaders, leader) + } + for true { t := time.Now() if t.Sub(start).Seconds() >= totalTime { @@ -224,8 +229,8 @@ func main() { allCrossTxs := []*blockchain.Transaction{} // Generate simulated transactions - for i, leader := range leaders { - txs, crossTxs := generateSimulatedTransactions(i, nodes) + for shardId, leader := range shardIdLeaderMap { + txs, crossTxs := generateSimulatedTransactions(int(shardId), nodes) allCrossTxs = append(allCrossTxs, crossTxs...) log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs), "numCrossTxs", len(crossTxs), "block height", btcTXIter.GetBlockIndex()) diff --git a/client/client.go b/client/client.go index 0c287a39d..e2c4f6c1a 100644 --- a/client/client.go +++ b/client/client.go @@ -16,7 +16,7 @@ import ( type Client struct { PendingCrossTxs map[[32]byte]*blockchain.Transaction // Map of TxId to pending cross shard txs. Pending means the proof-of-accept/rejects are not complete PendingCrossTxsMutex sync.Mutex // Mutex for the pending txs list - Leaders *[]p2p.Peer // All the leaders for each shard + Leaders *map[uint32]p2p.Peer // Map of shard Id and corresponding leader UpdateBlocks func([]*blockchain.Block) // Closure function used to sync new block with the leader. Once the leader finishes the consensus on a new block, it will send it to the clients. Clients use this method to update their blockchain ShardUtxoMap map[uint32]blockchain.UtxoMap @@ -56,7 +56,7 @@ func (client *Client) TransactionMessageHandler(msgPayload []byte) { // 3) checks whether all input utxos of the transaction have a corresponding proof. // 4) for all transactions with full proofs, broadcast them back to the leaders func (client *Client) handleProofOfLockMessage(proofs *[]blockchain.CrossShardTxProof) { - txsToSend := []blockchain.Transaction{} + txsToSend := []*blockchain.Transaction{} //fmt.Printf("PENDING CLIENT TX - %d\n", len(client.PendingCrossTxs)) // Loop through the newly received list of proofs @@ -89,7 +89,7 @@ func (client *Client) handleProofOfLockMessage(proofs *[]blockchain.CrossShardTx } if readyToUnlock { - txsToSend = append(txsToSend, *txAndProofs) + txsToSend = append(txsToSend, txAndProofs) } } @@ -101,7 +101,7 @@ func (client *Client) handleProofOfLockMessage(proofs *[]blockchain.CrossShardTx // Broadcast the cross txs with full proofs for unlock-to-commit/abort if len(txsToSend) != 0 { - client.broadcastCrossShardTxUnlockMessage(&txsToSend) + client.sendCrossShardTxUnlockMessage(txsToSend) } } @@ -115,12 +115,14 @@ func (client *Client) handleFetchUtxoResponseMessage(utxoResponse client_proto.F client.ShardUtxoMap[utxoResponse.ShardId] = utxoResponse.UtxoMap } -func (client *Client) broadcastCrossShardTxUnlockMessage(txsToSend *[]blockchain.Transaction) { - p2p.BroadcastMessage(*client.Leaders, node.ConstructUnlockToCommitOrAbortMessage(*txsToSend)) +func (client *Client) sendCrossShardTxUnlockMessage(txsToSend []*blockchain.Transaction) { + for shardId, txs := range BuildOutputShardTransactionMap(txsToSend) { + p2p.SendMessage((*client.Leaders)[shardId], node.ConstructUnlockToCommitOrAbortMessage(txs)) + } } // Create a new Client -func NewClient(leaders *[]p2p.Peer) *Client { +func NewClient(leaders *map[uint32]p2p.Peer) *Client { client := Client{} client.PendingCrossTxs = make(map[[32]byte]*blockchain.Transaction) client.Leaders = leaders @@ -129,3 +131,39 @@ func NewClient(leaders *[]p2p.Peer) *Client { client.log = log.New() return &client } + +func BuildOutputShardTransactionMap(txs []*blockchain.Transaction) map[uint32][]*blockchain.Transaction { + txsShardMap := make(map[uint32][]*blockchain.Transaction) + + // Put txs into corresponding output shards + for _, crossTx := range txs { + for curShardId, _ := range GetOutputShardIdsOfCrossShardTx(crossTx) { + txsShardMap[curShardId] = append(txsShardMap[curShardId], crossTx) + } + } + return txsShardMap +} + +func GetInputShardIdsOfCrossShardTx(crossTx *blockchain.Transaction) map[uint32]bool { + shardIds := map[uint32]bool{} + for _, txInput := range crossTx.TxInput { + shardIds[txInput.ShardID] = true + } + return shardIds +} + +func GetOutputShardIdsOfCrossShardTx(crossTx *blockchain.Transaction) map[uint32]bool { + shardIds := map[uint32]bool{} + for _, txOutput := range crossTx.TxOutput { + shardIds[txOutput.ShardID] = true + } + return shardIds +} + +func (client *Client) GetLeaders() []p2p.Peer { + leaders := []p2p.Peer{} + for _, leader := range *client.Leaders { + leaders = append(leaders, leader) + } + return leaders +} diff --git a/client/config/config.go b/client/config/config.go index 40aa35b53..2e4978d69 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -40,21 +40,19 @@ func (config *Config) GetValidators() []p2p.Peer { } // Gets all the leader peers and corresponding shard Ids -func (config *Config) GetLeadersAndShardIds() ([]p2p.Peer, []uint32) { - var peerList []p2p.Peer - var shardIDs []uint32 +func (config *Config) GetShardIdToLeaderMap() map[uint32]p2p.Peer { + shardIdLeaderMap := map[uint32]p2p.Peer{} for _, entry := range config.config { if entry.Role == "leader" { - peerList = append(peerList, p2p.Peer{Ip: entry.IP, Port: entry.Port}) val, err := strconv.Atoi(entry.ShardID) if err == nil { - shardIDs = append(shardIDs, uint32(val)) + shardIdLeaderMap[uint32(val)] = p2p.Peer{Ip: entry.IP, Port: entry.Port} } else { log.Print("[Generator] Error parsing the shard Id ", entry.ShardID) } } } - return peerList, shardIDs + return shardIdLeaderMap } func (config *Config) GetClientPeer() *p2p.Peer { diff --git a/client/txgen/main.go b/client/txgen/main.go index becc2aef5..0ed4f3d6b 100644 --- a/client/txgen/main.go +++ b/client/txgen/main.go @@ -254,11 +254,11 @@ func main() { // Read the configs config := client_config.NewConfig() config.ReadConfigFile(*configFile) - leaders, shardIds := config.GetLeadersAndShardIds() + shardIdLeaderMap := config.GetShardIdToLeaderMap() setting.numOfAddress = 10000 // Do cross shard tx if there are more than one shard - setting.crossShard = len(shardIds) > 1 + setting.crossShard = len(shardIdLeaderMap) > 1 setting.maxNumTxsPerBatch = *maxNumTxsPerBatch setting.crossShardRatio = *crossShardRatio @@ -273,7 +273,7 @@ func main() { // Nodes containing utxopools to mirror the shards' data in the network nodes := []*node.Node{} - for _, shardId := range shardIds { + for shardId, _ := range shardIdLeaderMap { node := node.New(&consensus.Consensus{ShardID: shardId}, nil) // Assign many fake addresses so we have enough address to play with at first node.AddTestingAddresses(setting.numOfAddress) @@ -286,7 +286,7 @@ func main() { clientNode := node.New(consensusObj, nil) if clientPort != "" { - clientNode.Client = client.NewClient(&leaders) + clientNode.Client = client.NewClient(&shardIdLeaderMap) // This func is used to update the client's utxopool when new blocks are received from the leaders updateBlocksFunc := func(blocks []*blockchain.Block) { @@ -319,53 +319,51 @@ func main() { start := time.Now() totalTime := float64(*duration) - batchCounter := 0 + subsetCounter := 0 for true { t := time.Now() if totalTime > 0 && t.Sub(start).Seconds() >= totalTime { log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) break } - for shardId := 0; shardId < len(nodes); shardId++ { - constructAndSendTransaction(batchCounter, *numSubset, shardId, leaders, nodes, clientNode, clientPort) + shardIdTxsMap := make(map[uint32][]*blockchain.Transaction) + for shardId, _ := range shardIdLeaderMap { // Generate simulated transactions + txs, crossTxs := generateSimulatedTransactions(subsetCounter, *numSubset, int(shardId), nodes) + + // Put cross shard tx into a pending list waiting for proofs from leaders + if clientPort != "" { + clientNode.Client.PendingCrossTxsMutex.Lock() + for _, tx := range crossTxs { + clientNode.Client.PendingCrossTxs[tx.ID] = tx + } + clientNode.Client.PendingCrossTxsMutex.Unlock() + } + + // Put txs into corresponding shards + shardIdTxsMap[shardId] = append(shardIdTxsMap[shardId], txs...) + for _, crossTx := range crossTxs { + for curShardId, _ := range client.GetInputShardIdsOfCrossShardTx(crossTx) { + shardIdTxsMap[curShardId] = append(shardIdTxsMap[curShardId], crossTx) + } + } + } + + for shardId, txs := range shardIdTxsMap { // Send the txs to corresponding shards + SendTxsToLeader(shardIdLeaderMap[shardId], txs) } - batchCounter++ - time.Sleep(5000 * time.Millisecond) + + subsetCounter++ + time.Sleep(2000 * time.Millisecond) } // Send a stop message to stop the nodes at the end msg := proto_node.ConstructStopMessage() - peers := append(config.GetValidators(), leaders...) + peers := append(config.GetValidators(), clientNode.Client.GetLeaders()...) p2p.BroadcastMessage(peers, msg) } -func constructAndSendTransaction(subsetId, numSubset, shardId int, leaders []p2p.Peer, nodes []*node.Node, clientNode *node.Node, clientPort string) { - allCrossTxs := []*blockchain.Transaction{} - // Generate simulated transactions - leader := leaders[shardId] - - txs, crossTxs := generateSimulatedTransactions(subsetId, numSubset, shardId, nodes) - allCrossTxs = append(allCrossTxs, crossTxs...) - - log.Debug("[Generator] Sending single-shard txs ...", "leader", leader, "numTxs", len(txs)) +func SendTxsToLeader(leader p2p.Peer, txs []*blockchain.Transaction) { + log.Debug("[Generator] Sending txs to...", "leader", leader, "numTxs", len(txs)) msg := proto_node.ConstructTransactionListMessage(txs) p2p.SendMessage(leader, msg) - // Note cross shard txs are later sent in batch - - if len(allCrossTxs) > 0 { - log.Debug("[Generator] Broadcasting cross-shard txs ...", "allCrossTxs", len(allCrossTxs)) - //fmt.Printf("SENDING CLIENT TXS: %d\n", shardId) - //fmt.Println(allCrossTxs) - msg := proto_node.ConstructTransactionListMessage(allCrossTxs) - p2p.BroadcastMessage(leaders, msg) - - // Put cross shard tx into a pending list waiting for proofs from leaders - if clientPort != "" { - clientNode.Client.PendingCrossTxsMutex.Lock() - for _, tx := range allCrossTxs { - clientNode.Client.PendingCrossTxs[tx.ID] = tx - } - clientNode.Client.PendingCrossTxsMutex.Unlock() - } - } } diff --git a/client/wallet/main.go b/client/wallet/main.go index 0e1677525..01c2d9423 100644 --- a/client/wallet/main.go +++ b/client/wallet/main.go @@ -237,10 +237,10 @@ func getLeaders() []p2p.Peer { func CreateWalletServerNode() *node.Node { configr := client_config.NewConfig() configr.ReadConfigFile("local_config_shards.txt") - leaders, _ := configr.GetLeadersAndShardIds() + shardIdLeaderMap := configr.GetShardIdToLeaderMap() clientPeer := configr.GetClientPeer() walletNode := node.New(nil, nil) - walletNode.Client = client.NewClient(&leaders) + walletNode.Client = client.NewClient(&shardIdLeaderMap) walletNode.ClientPeer = clientPeer return walletNode } @@ -254,7 +254,7 @@ func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error } msg := proto_node.ConstructTransactionListMessage([]*blockchain.Transaction{&tx}) - p2p.BroadcastMessage(*walletNode.Client.Leaders, msg) + p2p.BroadcastMessage(walletNode.Client.GetLeaders(), msg) doneSignal := make(chan int) go func() { @@ -279,7 +279,7 @@ func ExecuteTransaction(tx blockchain.Transaction, walletNode *node.Node) error func FetchUtxos(addresses [][20]byte, walletNode *node.Node) (map[uint32]blockchain.UtxoMap, error) { fmt.Println("Fetching account balance...") walletNode.Client.ShardUtxoMap = make(map[uint32]blockchain.UtxoMap) - p2p.BroadcastMessage(*walletNode.Client.Leaders, proto_node.ConstructFetchUtxoMessage(*walletNode.ClientPeer, addresses)) + p2p.BroadcastMessage(walletNode.Client.GetLeaders(), proto_node.ConstructFetchUtxoMessage(*walletNode.ClientPeer, addresses)) doneSignal := make(chan int) go func() { diff --git a/proto/node/node.go b/proto/node/node.go index 4bba05cec..9c7564046 100644 --- a/proto/node/node.go +++ b/proto/node/node.go @@ -58,7 +58,7 @@ type FetchUtxoMessage struct { } // [client] Constructs the unlock to commit or abort message that will be sent to leaders -func ConstructUnlockToCommitOrAbortMessage(txsAndProofs []blockchain.Transaction) []byte { +func ConstructUnlockToCommitOrAbortMessage(txsAndProofs []*blockchain.Transaction) []byte { byteBuffer := bytes.NewBuffer([]byte{byte(proto.NODE)}) byteBuffer.WriteByte(byte(TRANSACTION)) byteBuffer.WriteByte(byte(UNLOCK))