remove unnecessary db writing for pending crosslinks (#3751)

* remove unnecessary db writing for crosslinks

* Fix comment and add log

* change comment
pull/3756/head
Rongjian Lan 4 years ago committed by GitHub
parent 3e851a8cca
commit d9489bdb6e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 51
      core/blockchain.go
  2. 4
      node/node_syncing.go

@ -839,6 +839,11 @@ func (bc *BlockChain) Stop() {
if !atomic.CompareAndSwapInt32(&bc.running, 0, 1) {
return
}
if err := bc.SavePendingCrossLinks(); err != nil {
utils.Logger().Error().Err(err).Msg("Failed to save pending cross links")
}
// Unsubscribe all subscriptions registered from blockchain
bc.scope.Close()
close(bc.quit)
@ -2070,9 +2075,11 @@ func (bc *BlockChain) ReadPendingSlashingCandidates() slash.Records {
// ReadPendingCrossLinks retrieves pending crosslinks
func (bc *BlockChain) ReadPendingCrossLinks() ([]types.CrossLink, error) {
cls := []types.CrossLink{}
bytes := []byte{}
if cached, ok := bc.pendingCrossLinksCache.Get(pendingCLCacheKey); ok {
bytes = cached.([]byte)
cls = cached.([]types.CrossLink)
return cls, nil
} else {
by, err := rawdb.ReadPendingCrossLinks(bc.db)
if err != nil || len(by) == 0 {
@ -2080,16 +2087,17 @@ func (bc *BlockChain) ReadPendingCrossLinks() ([]types.CrossLink, error) {
}
bytes = by
}
cls := []types.CrossLink{}
if err := rlp.DecodeBytes(bytes, &cls); err != nil {
utils.Logger().Error().Err(err).Msg("Invalid pending crosslink RLP decoding")
return nil, err
}
bc.pendingCrossLinksCache.Add(pendingCLCacheKey, cls)
return cls, nil
}
// WritePendingCrossLinks saves the pending crosslinks
func (bc *BlockChain) WritePendingCrossLinks(crossLinks []types.CrossLink) error {
// CachePendingCrossLinks caches the pending crosslinks in memory
func (bc *BlockChain) CachePendingCrossLinks(crossLinks []types.CrossLink) error {
// deduplicate crosslinks if any
m := map[uint32]map[uint64]types.CrossLink{}
for _, cl := range crossLinks {
@ -2105,22 +2113,25 @@ func (bc *BlockChain) WritePendingCrossLinks(crossLinks []types.CrossLink) error
cls = append(cls, cl)
}
}
utils.Logger().Debug().Msgf("[WritePendingCrossLinks] Before Dedup has %d cls, after Dedup has %d cls", len(crossLinks), len(cls))
utils.Logger().Debug().Msgf("[CachePendingCrossLinks] Before Dedup has %d cls, after Dedup has %d cls", len(crossLinks), len(cls))
bytes, err := rlp.EncodeToBytes(cls)
if err != nil {
utils.Logger().Error().Msg("[WritePendingCrossLinks] Failed to encode pending crosslinks")
return err
}
if err := rawdb.WritePendingCrossLinks(bc.db, bytes); err != nil {
return err
}
by, err := rlp.EncodeToBytes(cls)
if err == nil {
bc.pendingCrossLinksCache.Add(pendingCLCacheKey, by)
}
bc.pendingCrossLinksCache.Add(pendingCLCacheKey, cls)
return nil
}
// SavePendingCrossLinks saves the pending crosslinks in db
func (bc *BlockChain) SavePendingCrossLinks() error {
if cached, ok := bc.pendingCrossLinksCache.Get(pendingCLCacheKey); ok {
cls := cached.([]types.CrossLink)
bytes, err := rlp.EncodeToBytes(cls)
if err != nil {
return err
}
if err := rawdb.WritePendingCrossLinks(bc.db, bytes); err != nil {
return err
}
}
return nil
}
// AddPendingSlashingCandidates appends pending slashing candidates
@ -2163,11 +2174,11 @@ func (bc *BlockChain) AddPendingCrossLinks(pendingCLs []types.CrossLink) (int, e
cls, err := bc.ReadPendingCrossLinks()
if err != nil || len(cls) == 0 {
err := bc.WritePendingCrossLinks(pendingCLs)
err := bc.CachePendingCrossLinks(pendingCLs)
return len(pendingCLs), err
}
cls = append(cls, pendingCLs...)
err = bc.WritePendingCrossLinks(cls)
err = bc.CachePendingCrossLinks(cls)
return len(cls), err
}
@ -2199,7 +2210,7 @@ func (bc *BlockChain) DeleteFromPendingCrossLinks(crossLinks []types.CrossLink)
}
pendingCLs = append(pendingCLs, cl)
}
err = bc.WritePendingCrossLinks(pendingCLs)
err = bc.CachePendingCrossLinks(pendingCLs)
return len(pendingCLs), err
}

@ -488,6 +488,10 @@ func (node *Node) CalculateResponse(request *downloader_pb.DownloaderRequest, in
}
payloadSize += len(encoded)
if payloadSize > getBlocksRequestHardCap {
utils.Logger().Warn().Err(err).
Int("req size", len(request.Hashes)).
Int("cur size", len(response.Payload)).
Msg("[SYNC] Max blocks response size reached, ignoring the rest.")
break
}
response.Payload = append(response.Payload, encoded)

Loading…
Cancel
Save