Fix concurrent map write issue in consensus; make txgen run in sync with new blocks for each shard; re-enable the final TPS report

pull/171/head
Rongjian Lan 6 years ago
parent e8470b2299
commit 0aaffceb4c
  1. 72
      client/txgen/main.go
  2. 5
      consensus/consensus_leader.go

@ -66,7 +66,6 @@ func main() {
runtime.GOMAXPROCS(1024)
var clientPeer *p2p.Peer
var peers []p2p.Peer
var shardIDLeaderMap map[uint32]p2p.Peer
var config *client_config.Config
@ -74,7 +73,6 @@ func main() {
candidateNode := newnode.New(*ip, *port)
BCPeer := p2p.Peer{IP: *bcIP, Port: *bcPort}
candidateNode.ContactBeaconChain(BCPeer)
peers = nil
clientPeer = &p2p.Peer{IP: *ip, Port: *port}
_, pubKey := utils.GenKey(clientPeer.IP, clientPeer.Port)
clientPeer.PubKey = pubKey
@ -130,6 +128,12 @@ func main() {
clientNode.Client = client.NewClient(clientNode.GetHost(), &shardIDLeaderMap)
// This func is used to update the client's utxopool when new blocks are received from the leaders
readySignal := make(chan uint32)
go func() {
for i, _ := range shardIDLeaderMap {
readySignal <- i
}
}()
updateBlocksFunc := func(blocks []*blockchain.Block) {
for _, block := range blocks {
for _, node := range nodes {
@ -149,6 +153,7 @@ func main() {
node.UpdateUtxoAndState(block)
node.Worker.UpdateCurrent()
utxoPoolMutex.Unlock()
readySignal <- shardID
} else {
continue
}
@ -186,39 +191,31 @@ func main() {
log.Debug("Generator timer ended.", "duration", (int(t.Sub(start))), "startTime", start, "totalTime", totalTime)
break
}
shardIDTxsMap := make(map[uint32]types.Transactions)
lock := sync.Mutex{}
var wg sync.WaitGroup
wg.Add(len(shardIDLeaderMap))
utxoPoolMutex.Lock()
log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0))
for shardID := range shardIDLeaderMap { // Generate simulated transactions
go func(shardID uint32) {
txs, _ := txgen.GenerateSimulatedTransactionsAccount(int(shardID), nodes, setting)
// TODO: Put cross shard tx into a pending list waiting for proofs from leaders
lock.Lock()
// Put txs into corresponding shards
shardIDTxsMap[shardID] = append(shardIDTxsMap[shardID], txs...)
lock.Unlock()
wg.Done()
}(shardID)
}
wg.Wait()
utxoPoolMutex.Unlock()
lock.Lock()
for shardID, txs := range shardIDTxsMap { // Send the txs to corresponding shards
go func(shardID uint32, txs types.Transactions) {
SendTxsToLeaderAccount(clientNode, shardIDLeaderMap[shardID], txs)
}(shardID, txs)
select {
case shardID := <-readySignal:
shardIDTxsMap := make(map[uint32]types.Transactions)
lock := sync.Mutex{}
utxoPoolMutex.Lock()
log.Warn("STARTING TX GEN", "gomaxprocs", runtime.GOMAXPROCS(0))
txs, _ := txgen.GenerateSimulatedTransactionsAccount(int(shardID), nodes, setting)
// TODO: Put cross shard tx into a pending list waiting for proofs from leaders
lock.Lock()
// Put txs into corresponding shards
shardIDTxsMap[shardID] = append(shardIDTxsMap[shardID], txs...)
lock.Unlock()
utxoPoolMutex.Unlock()
lock.Lock()
for shardID, txs := range shardIDTxsMap { // Send the txs to corresponding shards
go func(shardID uint32, txs types.Transactions) {
SendTxsToLeaderAccount(clientNode, shardIDLeaderMap[shardID], txs)
}(shardID, txs)
}
lock.Unlock()
}
lock.Unlock()
subsetCounter++
time.Sleep(60000 * time.Millisecond)
}
} else {
for {
@ -277,12 +274,7 @@ func main() {
// Send a stop message to stop the nodes at the end
msg := proto_node.ConstructStopMessage()
if *peerDiscovery {
peers = clientNode.Consensus.GetValidatorPeers()
} else {
peers = append(config.GetValidators(), clientNode.Client.GetLeaders()...)
}
clientNode.BroadcastMessage(peers, msg)
clientNode.BroadcastMessage(clientNode.Client.GetLeaders(), msg)
time.Sleep(3000 * time.Millisecond)
}

@ -144,10 +144,10 @@ func (consensus *Consensus) startConsensus(newBlock *blockchain.Block) {
consensus.Log.Debug("Stop encoding block")
msgToSend := consensus.constructAnnounceMessage()
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
// Set state to AnnounceDone
consensus.state = AnnounceDone
consensus.commitByLeader(true)
host.BroadcastMessageFromLeader(consensus.host, consensus.GetValidatorPeers(), msgToSend, consensus.OfflinePeers)
}
// commitByLeader commits to the message itself before receiving others commits
@ -156,6 +156,8 @@ func (consensus *Consensus) commitByLeader(firstRound bool) {
secret, commitment := crypto.Commit(crypto.Ed25519Curve)
consensus.secret[consensus.consensusID] = secret
if firstRound {
consensus.mutex.Lock()
defer consensus.mutex.Unlock()
(*consensus.commitments)[consensus.nodeID] = commitment
consensus.bitmap.SetKey(consensus.pubKey, true)
} else {
@ -257,6 +259,7 @@ func (consensus *Consensus) processCommitMessage(payload []byte, targetState Sta
if targetState == FinalChallengeDone {
msgTypeToSend = proto_consensus.FinalChallenge
}
msgToSend, challengeScalar, aggCommitment := consensus.constructChallengeMessage(msgTypeToSend)
bytes, err := challengeScalar.MarshalBinary()
if err != nil {

Loading…
Cancel
Save