|
|
|
@ -1,9 +1,6 @@ |
|
|
|
|
package db |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"fmt" |
|
|
|
|
"strconv" |
|
|
|
|
"strings" |
|
|
|
|
"sync" |
|
|
|
|
"time" |
|
|
|
|
|
|
|
|
@ -128,157 +125,7 @@ func (db *LDBDatabase) LDB() *leveldb.DB { |
|
|
|
|
return db.db |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// meter periodically retrieves internal leveldb counters and reports them to
|
|
|
|
|
// the metrics subsystem.
|
|
|
|
|
//
|
|
|
|
|
// This is how a stats table look like (currently):
|
|
|
|
|
// Compactions
|
|
|
|
|
// Level | Tables | Size(MB) | Time(sec) | Read(MB) | Write(MB)
|
|
|
|
|
// -------+------------+---------------+---------------+---------------+---------------
|
|
|
|
|
// 0 | 0 | 0.00000 | 1.27969 | 0.00000 | 12.31098
|
|
|
|
|
// 1 | 85 | 109.27913 | 28.09293 | 213.92493 | 214.26294
|
|
|
|
|
// 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884
|
|
|
|
|
// 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000
|
|
|
|
|
//
|
|
|
|
|
// This is how the write delay look like (currently):
|
|
|
|
|
// DelayN:5 Delay:406.604657ms Paused: false
|
|
|
|
|
//
|
|
|
|
|
// This is how the iostats look like (currently):
|
|
|
|
|
// Read(MB):3895.04860 Write(MB):3654.64712
|
|
|
|
|
func (db *LDBDatabase) meter(refresh time.Duration) { |
|
|
|
|
// Create the counters to store current and previous compaction values
|
|
|
|
|
compactions := make([][]float64, 2) |
|
|
|
|
for i := 0; i < 2; i++ { |
|
|
|
|
compactions[i] = make([]float64, 3) |
|
|
|
|
} |
|
|
|
|
// Create storage for iostats.
|
|
|
|
|
var iostats [2]float64 |
|
|
|
|
|
|
|
|
|
// Create storage and warning log tracer for write delay.
|
|
|
|
|
var ( |
|
|
|
|
delaystats [2]int64 |
|
|
|
|
lastWritePaused time.Time |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
errc chan error |
|
|
|
|
merr error |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// Iterate ad infinitum and collect the stats
|
|
|
|
|
for i := 1; errc == nil && merr == nil; i++ { |
|
|
|
|
// Retrieve the database stats
|
|
|
|
|
stats, err := db.db.GetProperty("leveldb.stats") |
|
|
|
|
if err != nil { |
|
|
|
|
db.log.Error("Failed to read database stats", "err", err) |
|
|
|
|
merr = err |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
// Find the compaction table, skip the header
|
|
|
|
|
lines := strings.Split(stats, "\n") |
|
|
|
|
for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" { |
|
|
|
|
lines = lines[1:] |
|
|
|
|
} |
|
|
|
|
if len(lines) <= 3 { |
|
|
|
|
db.log.Error("Compaction table not found") |
|
|
|
|
merr = errors.New("compaction table not found") |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
lines = lines[3:] |
|
|
|
|
|
|
|
|
|
// Iterate over all the table rows, and accumulate the entries
|
|
|
|
|
for j := 0; j < len(compactions[i%2]); j++ { |
|
|
|
|
compactions[i%2][j] = 0 |
|
|
|
|
} |
|
|
|
|
for _, line := range lines { |
|
|
|
|
parts := strings.Split(line, "|") |
|
|
|
|
if len(parts) != 6 { |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
for idx, counter := range parts[3:] { |
|
|
|
|
value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64) |
|
|
|
|
if err != nil { |
|
|
|
|
db.log.Error("Compaction entry parsing failed", "err", err) |
|
|
|
|
merr = err |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
compactions[i%2][idx] += value |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Retrieve the write delay statistic
|
|
|
|
|
writedelay, err := db.db.GetProperty("leveldb.writedelay") |
|
|
|
|
if err != nil { |
|
|
|
|
db.log.Error("Failed to read database write delay statistic", "err", err) |
|
|
|
|
merr = err |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
var ( |
|
|
|
|
delayN int64 |
|
|
|
|
delayDuration string |
|
|
|
|
duration time.Duration |
|
|
|
|
paused bool |
|
|
|
|
) |
|
|
|
|
if n, err := fmt.Sscanf(writedelay, "DelayN:%d Delay:%s Paused:%t", &delayN, &delayDuration, &paused); n != 3 || err != nil { |
|
|
|
|
db.log.Error("Write delay statistic not found") |
|
|
|
|
merr = err |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
duration, err = time.ParseDuration(delayDuration) |
|
|
|
|
if err != nil { |
|
|
|
|
db.log.Error("Failed to parse delay duration", "err", err) |
|
|
|
|
merr = err |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
// If a warning that db is performing compaction has been displayed, any subsequent
|
|
|
|
|
// warnings will be withheld for one minute not to overwhelm the user.
|
|
|
|
|
if paused && delayN-delaystats[0] == 0 && duration.Nanoseconds()-delaystats[1] == 0 && |
|
|
|
|
time.Now().After(lastWritePaused.Add(writePauseWarningThrottler)) { |
|
|
|
|
db.log.Warn("Database compacting, degraded performance") |
|
|
|
|
lastWritePaused = time.Now() |
|
|
|
|
} |
|
|
|
|
delaystats[0], delaystats[1] = delayN, duration.Nanoseconds() |
|
|
|
|
|
|
|
|
|
// Retrieve the database iostats.
|
|
|
|
|
ioStats, err := db.db.GetProperty("leveldb.iostats") |
|
|
|
|
if err != nil { |
|
|
|
|
db.log.Error("Failed to read database iostats", "err", err) |
|
|
|
|
merr = err |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
var nRead, nWrite float64 |
|
|
|
|
parts := strings.Split(ioStats, " ") |
|
|
|
|
if len(parts) < 2 { |
|
|
|
|
db.log.Error("Bad syntax of ioStats", "ioStats", ioStats) |
|
|
|
|
merr = fmt.Errorf("bad syntax of ioStats %s", ioStats) |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if n, err := fmt.Sscanf(parts[0], "Read(MB):%f", &nRead); n != 1 || err != nil { |
|
|
|
|
db.log.Error("Bad syntax of read entry", "entry", parts[0]) |
|
|
|
|
merr = err |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if n, err := fmt.Sscanf(parts[1], "Write(MB):%f", &nWrite); n != 1 || err != nil { |
|
|
|
|
db.log.Error("Bad syntax of write entry", "entry", parts[1]) |
|
|
|
|
merr = err |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
iostats[0], iostats[1] = nRead, nWrite |
|
|
|
|
|
|
|
|
|
// Sleep a bit, then repeat the stats collection
|
|
|
|
|
select { |
|
|
|
|
case errc = <-db.quitChan: |
|
|
|
|
// Quit requesting, stop hammering the database
|
|
|
|
|
case <-time.After(refresh): |
|
|
|
|
// Timeout, gather a new set of stats
|
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if errc == nil { |
|
|
|
|
errc = <-db.quitChan |
|
|
|
|
} |
|
|
|
|
errc <- merr |
|
|
|
|
} |
|
|
|
|
// TODO(minhdoan): Might add meter func from ethereum-go repo
|
|
|
|
|
|
|
|
|
|
func (db *LDBDatabase) NewBatch() Batch { |
|
|
|
|
return &ldbBatch{db: db.db, b: new(leveldb.Batch)} |
|
|
|
|