From 0aaffceb4c377d9061b611e09b509847a738103d Mon Sep 17 00:00:00 2001 From: Rongjian Lan Date: Wed, 19 Dec 2018 18:31:30 -0800 Subject: [PATCH] Fix concurrent map write issue in consensus; make txgen run in sync with new blocks for each shard; re-enable the final TPS report --- client/txgen/main.go | 72 ++++++++++++++++------------------- consensus/consensus_leader.go | 5 ++- 2 files changed, 36 insertions(+), 41 deletions(-) diff --git a/client/txgen/main.go b/client/txgen/main.go index 5c98e36de..0cd28df69 100644 --- a/client/txgen/main.go +++ b/client/txgen/main.go @@ -66,7 +66,6 @@ func main() { runtime.GOMAXPROCS(1024) var clientPeer *p2p.Peer - var peers []p2p.Peer var shardIDLeaderMap map[uint32]p2p.Peer var config *client_config.Config @@ -74,7 +73,6 @@ func main() { candidateNode := newnode.New(*ip, *port) BCPeer := p2p.Peer{IP: *bcIP, Port: *bcPort} candidateNode.ContactBeaconChain(BCPeer) - peers = nil clientPeer = &p2p.Peer{IP: *ip, Port: *port} _, pubKey := utils.GenKey(clientPeer.IP, clientPeer.Port) clientPeer.PubKey = pubKey @@ -130,6 +128,12 @@ func main() { clientNode.Client = client.NewClient(clientNode.GetHost(), &shardIDLeaderMap) // This func is used to update the client's utxopool when new blocks are received from the leaders + readySignal := make(chan uint32) + go func() { + for i, _ := range shardIDLeaderMap { + readySignal <- i + } + }() updateBlocksFunc := func(blocks []*blockchain.Block) { for _, block := range blocks { for _, node := range nodes { @@ -149,6 +153,7 @@ func main() { node.UpdateUtxoAndState(block) node.Worker.UpdateCurrent() utxoPoolMutex.Unlock() + readySignal <- shardID } else { continue } @@ -186,39 +191,31 @@ func main() { log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime) break } - shardIDTxsMap := make(map[uint32]types.Transactions) - lock := sync.Mutex{} - var wg sync.WaitGroup - wg.Add(len(shardIDLeaderMap)) - - utxoPoolMutex.Lock() - log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0)) - for shardID := range shardIDLeaderMap { // Generate simulated transactions - go func(shardID uint32) { - txs, _ := txgen.GenerateSimulatedTransactionsAccount(int(shardID), nodes, setting) - - // TODO: Put cross shard tx into a pending list waiting for proofs from leaders - - lock.Lock() - // Put txs into corresponding shards - shardIDTxsMap[shardID] = append(shardIDTxsMap[shardID], txs...) - lock.Unlock() - wg.Done() - }(shardID) - } - wg.Wait() - utxoPoolMutex.Unlock() - - lock.Lock() - for shardID, txs := range shardIDTxsMap { // Send the txs to corresponding shards - go func(shardID uint32, txs types.Transactions) { - SendTxsToLeaderAccount(clientNode, shardIDLeaderMap[shardID], txs) - }(shardID, txs) + select { + case shardID := <-readySignal: + shardIDTxsMap := make(map[uint32]types.Transactions) + lock := sync.Mutex{} + + utxoPoolMutex.Lock() + log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0)) + txs, _ := txgen.GenerateSimulatedTransactionsAccount(int(shardID), nodes, setting) + + // TODO: Put cross shard tx into a pending list waiting for proofs from leaders + + lock.Lock() + // Put txs into corresponding shards + shardIDTxsMap[shardID] = append(shardIDTxsMap[shardID], txs...) + lock.Unlock() + utxoPoolMutex.Unlock() + + lock.Lock() + for shardID, txs := range shardIDTxsMap { // Send the txs to corresponding shards + go func(shardID uint32, txs types.Transactions) { + SendTxsToLeaderAccount(clientNode, shardIDLeaderMap[shardID], txs) + }(shardID, txs) + } + lock.Unlock() } - lock.Unlock() - - subsetCounter++ - time.Sleep(60000 * time.Millisecond) } } else { for { @@ -277,12 +274,7 @@ func main() { // Send a stop message to stop the nodes at the end msg := proto_node.ConstructStopMessage() - if *peerDiscovery { - peers = clientNode.Consensus.GetValidatorPeers() - } else { - peers = append(config.GetValidators(), clientNode.Client.GetLeaders()...) - } - clientNode.BroadcastMessage(peers, msg) + clientNode.BroadcastMessage(clientNode.Client.GetLeaders(), msg) time.Sleep(3000 * time.Millisecond) } diff --git a/consensus/consensus_leader.go b/consensus/consensus_leader.go index 21046874a..216b9d00b 100644 --- a/consensus/consensus_leader.go +++ b/consensus/consensus_leader.go @@ -144,10 +144,10 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) { consensus.Log.Debug("Stop encoding block") msgToSend := consensus.constructAnnounceMessage() - host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) // Set state to AnnounceDone consensus.state = AnnounceDone consensus.commitByLeader(true) + host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers) } // commitByLeader commits to the message itself before receiving others commits @@ -156,6 +156,8 @@ func (consensus *Consensus) commitByLeader(firstRound bool) { secret, commitment := crypto.Commit(crypto.Ed25519Curve) consensus.secret[consensus.consensusID] = secret if firstRound { + consensus.mutex.Lock() + defer consensus.mutex.Unlock() (*consensus.commitments)[consensus.nodeID] = commitment consensus.bitmap.SetKey(consensus.pubKey, true) } else { @@ -257,6 +259,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Sta if targetState == FinalChallengeDone { msgTypeToSend = proto_consensus.FinalChallenge } + msgToSend, challengeScalar, aggCommitment := consensus.constructChallengeMessage(msgTypeToSend) bytes, err := challengeScalar.MarshalBinary() if err != nil {