diff --git a/core/blockchain.go b/core/blockchain.go index 078260d50..44fa52466 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -124,9 +124,10 @@ type BlockChain struct { scope event.SubscriptionScope genesisBlock *types.Block - mu sync.RWMutex // global mutex for locking chain operations - chainmu sync.RWMutex // blockchain insertion lock - procmu sync.RWMutex // block processor lock + mu sync.RWMutex // global mutex for locking chain operations + chainmu sync.RWMutex // blockchain insertion lock + procmu sync.RWMutex // block processor lock + pendingCrossLinksMutex sync.RWMutex // pending crosslinks lock checkpoint int // checkpoint counts towards the new checkpoint currentBlock atomic.Value // Current head of the block chain @@ -1282,7 +1283,6 @@ func (bc *BlockChain) WriteBlockWithState( } bc.LastContinuousCrossLink(crossLink) - // NOTE: Uptime stats is not mission critical code. Should move to offchain server // Writing validator stats (for uptime recording) for other shards if bc.chainConfig.IsStaking(crossLink.Epoch()) { shardState, err := bc.ReadShardState(crossLink.Epoch()) @@ -2317,6 +2317,9 @@ func (bc *BlockChain) WritePendingCrossLinks(crossLinks []types.CrossLink) error // AddPendingCrossLinks appends pending crosslinks func (bc *BlockChain) AddPendingCrossLinks(pendingCLs []types.CrossLink) (int, error) { + bc.pendingCrossLinksMutex.Lock() + defer bc.pendingCrossLinksMutex.Unlock() + cls, err := bc.ReadPendingCrossLinks() if err != nil || len(cls) == 0 { err := bc.WritePendingCrossLinks(pendingCLs) @@ -2329,6 +2332,9 @@ func (bc *BlockChain) AddPendingCrossLinks(pendingCLs []types.CrossLink) (int, e // DeleteCommittedFromPendingCrossLinks delete pending crosslinks that already committed (i.e. passed in the params) func (bc *BlockChain) DeleteCommittedFromPendingCrossLinks(crossLinks []types.CrossLink) (int, error) { + bc.pendingCrossLinksMutex.Lock() + defer bc.pendingCrossLinksMutex.Unlock() + cls, err := bc.ReadPendingCrossLinks() if err != nil || len(cls) == 0 { return 0, err diff --git a/node/node.go b/node/node.go index d39de6d4e..dcc0bd13b 100644 --- a/node/node.go +++ b/node/node.go @@ -115,8 +115,6 @@ type Node struct { pendingCXReceipts map[string]*types.CXReceiptsProof // All the receipts received but not yet processed for Consensus pendingCXMutex sync.Mutex - pendingCLMutex sync.Mutex //mutex for read/write pending crosslinks - // Shard databases shardChains shardchain.Collection diff --git a/node/node_cross_link.go b/node/node_cross_link.go index 4ba5fe89d..7df5d7a62 100644 --- a/node/node_cross_link.go +++ b/node/node_cross_link.go @@ -105,9 +105,7 @@ func (node *Node) ProcessCrossLinkMessage(msgPayload []byte) { utils.Logger().Debug(). Msgf("[ProcessingCrossLink] Committing for shardID %d, blockNum %d", cl.ShardID(), cl.Number().Uint64()) } - node.pendingCLMutex.Lock() Len, _ := node.Blockchain().AddPendingCrossLinks(candidates) - node.pendingCLMutex.Unlock() utils.Logger().Debug(). Msgf("[ProcessingCrossLink] Add pending crosslinks, total pending: %d", Len) } diff --git a/node/node_newblock.go b/node/node_newblock.go index 3516bed93..897dab07e 100644 --- a/node/node_newblock.go +++ b/node/node_newblock.go @@ -155,9 +155,7 @@ func (node *Node) proposeNewBlock() (*types.Block, error) { // Prepare cross links var crossLinksToPropose types.CrossLinks if node.NodeConfig.ShardID == 0 && node.Blockchain().Config().IsCrossLink(node.Worker.GetCurrentHeader().Epoch()) { - node.pendingCLMutex.Lock() allPending, err := node.Blockchain().ReadPendingCrossLinks() - node.pendingCLMutex.Unlock() if err == nil { for _, pending := range allPending {