Fix mutex misuse in pending crosslinks (#2094)

pull/2130/head
Rongjian Lan 5 years ago committed by GitHub
parent d92d9dc15e
commit eca9ca51ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      core/blockchain.go
  2. 2
      node/node.go
  3. 2
      node/node_cross_link.go
  4. 2
      node/node_newblock.go

@ -124,9 +124,10 @@ type BlockChain struct {
scope event.SubscriptionScope scope event.SubscriptionScope
genesisBlock *types.Block genesisBlock *types.Block
mu sync.RWMutex // global mutex for locking chain operations mu sync.RWMutex // global mutex for locking chain operations
chainmu sync.RWMutex // blockchain insertion lock chainmu sync.RWMutex // blockchain insertion lock
procmu sync.RWMutex // block processor lock procmu sync.RWMutex // block processor lock
pendingCrossLinksMutex sync.RWMutex // pending crosslinks lock
checkpoint int // checkpoint counts towards the new checkpoint checkpoint int // checkpoint counts towards the new checkpoint
currentBlock atomic.Value // Current head of the block chain currentBlock atomic.Value // Current head of the block chain
@ -1282,7 +1283,6 @@ func (bc *BlockChain) WriteBlockWithState(
} }
bc.LastContinuousCrossLink(crossLink) bc.LastContinuousCrossLink(crossLink)
// NOTE: Uptime stats is not mission critical code. Should move to offchain server
// Writing validator stats (for uptime recording) for other shards // Writing validator stats (for uptime recording) for other shards
if bc.chainConfig.IsStaking(crossLink.Epoch()) { if bc.chainConfig.IsStaking(crossLink.Epoch()) {
shardState, err := bc.ReadShardState(crossLink.Epoch()) shardState, err := bc.ReadShardState(crossLink.Epoch())
@ -2317,6 +2317,9 @@ func (bc *BlockChain) WritePendingCrossLinks(crossLinks []types.CrossLink) error
// AddPendingCrossLinks appends pending crosslinks // AddPendingCrossLinks appends pending crosslinks
func (bc *BlockChain) AddPendingCrossLinks(pendingCLs []types.CrossLink) (int, error) { func (bc *BlockChain) AddPendingCrossLinks(pendingCLs []types.CrossLink) (int, error) {
bc.pendingCrossLinksMutex.Lock()
defer bc.pendingCrossLinksMutex.Unlock()
cls, err := bc.ReadPendingCrossLinks() cls, err := bc.ReadPendingCrossLinks()
if err != nil || len(cls) == 0 { if err != nil || len(cls) == 0 {
err := bc.WritePendingCrossLinks(pendingCLs) err := bc.WritePendingCrossLinks(pendingCLs)
@ -2329,6 +2332,9 @@ func (bc *BlockChain) AddPendingCrossLinks(pendingCLs []types.CrossLink) (int, e
// DeleteCommittedFromPendingCrossLinks delete pending crosslinks that already committed (i.e. passed in the params) // DeleteCommittedFromPendingCrossLinks delete pending crosslinks that already committed (i.e. passed in the params)
func (bc *BlockChain) DeleteCommittedFromPendingCrossLinks(crossLinks []types.CrossLink) (int, error) { func (bc *BlockChain) DeleteCommittedFromPendingCrossLinks(crossLinks []types.CrossLink) (int, error) {
bc.pendingCrossLinksMutex.Lock()
defer bc.pendingCrossLinksMutex.Unlock()
cls, err := bc.ReadPendingCrossLinks() cls, err := bc.ReadPendingCrossLinks()
if err != nil || len(cls) == 0 { if err != nil || len(cls) == 0 {
return 0, err return 0, err

@ -115,8 +115,6 @@ type Node struct {
pendingCXReceipts map[string]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus pendingCXReceipts map[string]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus
pendingCXMutex sync.Mutex pendingCXMutex sync.Mutex
pendingCLMutex sync.Mutex //mutex for read/write pending crosslinks
// Shard databases // Shard databases
shardChains shardchain.Collection shardChains shardchain.Collection

@ -105,9 +105,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) {
utils.Logger().Debug(). utils.Logger().Debug().
Msgf("[ProcessingCrossLink] Committing for shardID %d, blockNum %d", cl.ShardID(), cl.Number().Uint64()) Msgf("[ProcessingCrossLink] Committing for shardID %d, blockNum %d", cl.ShardID(), cl.Number().Uint64())
} }
node.pendingCLMutex.Lock()
Len, _ := node.Blockchain().AddPendingCrossLinks(candidates) Len, _ := node.Blockchain().AddPendingCrossLinks(candidates)
node.pendingCLMutex.Unlock()
utils.Logger().Debug(). utils.Logger().Debug().
Msgf("[ProcessingCrossLink] Add pending crosslinks, total pending: %d", Len) Msgf("[ProcessingCrossLink] Add pending crosslinks, total pending: %d", Len)
} }

@ -155,9 +155,7 @@ func (node *Node) proposeNewBlock() (*types.Block, error) {
// Prepare cross links // Prepare cross links
var crossLinksToPropose types.CrossLinks var crossLinksToPropose types.CrossLinks
if node.NodeConfig.ShardID == 0 && node.Blockchain().Config().IsCrossLink(node.Worker.GetCurrentHeader().Epoch()) { if node.NodeConfig.ShardID == 0 && node.Blockchain().Config().IsCrossLink(node.Worker.GetCurrentHeader().Epoch()) {
node.pendingCLMutex.Lock()
allPending, err := node.Blockchain().ReadPendingCrossLinks() allPending, err := node.Blockchain().ReadPendingCrossLinks()
node.pendingCLMutex.Unlock()
if err == nil { if err == nil {
for _, pending := range allPending { for _, pending := range allPending {

Loading…
Cancel
Save