Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ethdb: Add paused write stat #15663

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 125 additions & 59 deletions ethdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,31 @@ import (
"github.com/syndtr/goleveldb/leveldb/iterator"
"github.com/syndtr/goleveldb/leveldb/opt"

"fmt"
gometrics "github.com/rcrowley/go-metrics"
)

var OpenFileLimit = 64
const (
writeDelayNThreshold = 200
writeDelayThreshold = 350 * time.Millisecond
writeDelayWarningThrottler = 1 * time.Minute
)

type LDBDatabase struct {
fn string // filename for reporting
db *leveldb.DB // LevelDB instance

getTimer gometrics.Timer // Timer for measuring the database get request counts and latencies
putTimer gometrics.Timer // Timer for measuring the database put request counts and latencies
delTimer gometrics.Timer // Timer for measuring the database delete request counts and latencies
missMeter gometrics.Meter // Meter for measuring the missed database get requests
readMeter gometrics.Meter // Meter for measuring the database get request data usage
writeMeter gometrics.Meter // Meter for measuring the database put request data usage
compTimeMeter gometrics.Meter // Meter for measuring the total time spent in database compaction
compReadMeter gometrics.Meter // Meter for measuring the data read during compaction
compWriteMeter gometrics.Meter // Meter for measuring the data written during compaction
getTimer gometrics.Timer // Timer for measuring the database get request counts and latencies
putTimer gometrics.Timer // Timer for measuring the database put request counts and latencies
delTimer gometrics.Timer // Timer for measuring the database delete request counts and latencies
missMeter gometrics.Meter // Meter for measuring the missed database get requests
readMeter gometrics.Meter // Meter for measuring the database get request data usage
writeMeter gometrics.Meter // Meter for measuring the database put request data usage
writeDelayNMeter gometrics.Meter // Meter for measuring the write delay number due to database compaction
writeDelayMeter gometrics.Meter // Meter for measuring the write delay duration due to database compaction
compTimeMeter gometrics.Meter // Meter for measuring the total time spent in database compaction
compReadMeter gometrics.Meter // Meter for measuring the data read during compaction
compWriteMeter gometrics.Meter // Meter for measuring the data written during compaction

quitLock sync.Mutex // Mutex protecting the quit channel access
quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
Expand Down Expand Up @@ -175,20 +182,23 @@ func (db *LDBDatabase) LDB() *leveldb.DB {

// Meter configures the database metrics collectors and
func (db *LDBDatabase) Meter(prefix string) {
// Short circuit metering if the metrics system is disabled
if !metrics.Enabled {
return
if metrics.Enabled {
// Initialize all database related metrics collector at the requested prefix
// if metric is enable.
db.getTimer = metrics.NewTimer(prefix + "user/gets")
db.putTimer = metrics.NewTimer(prefix + "user/puts")
db.delTimer = metrics.NewTimer(prefix + "user/dels")
db.missMeter = metrics.NewMeter(prefix + "user/misses")
db.readMeter = metrics.NewMeter(prefix + "user/reads")
db.writeMeter = metrics.NewMeter(prefix + "user/writes")

db.compTimeMeter = metrics.NewMeter(prefix + "compact/time")
db.compReadMeter = metrics.NewMeter(prefix + "compact/input")
db.compWriteMeter = metrics.NewMeter(prefix + "compact/output")
}
// Initialize all the metrics collector at the requested prefix
db.getTimer = metrics.NewTimer(prefix + "user/gets")
db.putTimer = metrics.NewTimer(prefix + "user/puts")
db.delTimer = metrics.NewTimer(prefix + "user/dels")
db.missMeter = metrics.NewMeter(prefix + "user/misses")
db.readMeter = metrics.NewMeter(prefix + "user/reads")
db.writeMeter = metrics.NewMeter(prefix + "user/writes")
db.compTimeMeter = metrics.NewMeter(prefix + "compact/time")
db.compReadMeter = metrics.NewMeter(prefix + "compact/input")
db.compWriteMeter = metrics.NewMeter(prefix + "compact/output")
// Initialize essential write delay metrics no matter metric is enable or not.
db.writeDelayMeter = metrics.NewMeter(prefix + "compact/writedelay/duration")
db.writeDelayNMeter = metrics.NewMeter(prefix + "compact/writedelay/counter")

// Create a quit channel for the periodic collector and run it
db.quitLock.Lock()
Expand All @@ -210,58 +220,114 @@ func (db *LDBDatabase) Meter(prefix string) {
// 2 | 523 | 1000.37159 | 7.26059 | 66.86342 | 66.77884
// 3 | 570 | 1113.18458 | 0.00000 | 0.00000 | 0.00000
func (db *LDBDatabase) meter(refresh time.Duration) {
var (
lastWriteDelay time.Time
lastWriteDelayN time.Time
)
// Create the counters to store current and previous values
counters := make([][]float64, 2)
for i := 0; i < 2; i++ {
counters[i] = make([]float64, 3)
counters[i] = make([]float64, 5)
}
// Iterate ad infinitum and collect the stats
for i := 1; ; i++ {
// Retrieve the database stats
stats, err := db.db.GetProperty("leveldb.stats")
if metrics.Enabled {
// Retrieve the database stats
stats, err := db.db.GetProperty("leveldb.stats")
if err != nil {
db.log.Error("Failed to read database stats", "err", err)
return
}
// Find the compaction table, skip the header
lines := strings.Split(stats, "\n")
for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" {
lines = lines[1:]
}
if len(lines) <= 3 {
db.log.Error("Compaction table not found")
return
}
lines = lines[3:]

// Iterate over all the table rows, and accumulate the entries
for j := 0; j < len(counters[i%2]); j++ {
counters[i%2][j] = 0
}
for _, line := range lines {
parts := strings.Split(line, "|")
if len(parts) != 6 {
break
}
for idx, counter := range parts[3:] {
value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
if err != nil {
db.log.Error("Compaction entry parsing failed", "err", err)
return
}
counters[i%2][idx] += value
}
}
// Update all the requested meters
if db.compTimeMeter != nil {
db.compTimeMeter.Mark(int64((counters[i%2][0] - counters[(i-1)%2][0]) * 1000 * 1000 * 1000))
}
if db.compReadMeter != nil {
db.compReadMeter.Mark(int64((counters[i%2][1] - counters[(i-1)%2][1]) * 1024 * 1024))
}
if db.compWriteMeter != nil {
db.compWriteMeter.Mark(int64((counters[i%2][2] - counters[(i-1)%2][2]) * 1024 * 1024))
}
}

// Gather write delay statistic
writeDelay, err := db.db.GetProperty("leveldb.writedelay")
if err != nil {
db.log.Error("Failed to read database stats", "err", err)
db.log.Error("Failed to read database write delay statistic", "err", err)
return
}
// Find the compaction table, skip the header
lines := strings.Split(stats, "\n")
for len(lines) > 0 && strings.TrimSpace(lines[0]) != "Compactions" {
lines = lines[1:]
var (
delayN int32
delayDuration string
duration time.Duration
)
if n, err := fmt.Sscanf(writeDelay, "DelayN:%d Delay:%s", &delayN, &delayDuration); n != 2 || err != nil {
db.log.Error("Write delay statistic not found")
return
}
if len(lines) <= 3 {
db.log.Error("Compaction table not found")
duration, err = time.ParseDuration(delayDuration)
if err != nil {
db.log.Error("Failed to parse delay duration", "err", err)
return
}
lines = lines[3:]

// Iterate over all the table rows, and accumulate the entries
for j := 0; j < len(counters[i%2]); j++ {
counters[i%2][j] = 0
}
for _, line := range lines {
parts := strings.Split(line, "|")
if len(parts) != 6 {
break
}
for idx, counter := range parts[3:] {
value, err := strconv.ParseFloat(strings.TrimSpace(counter), 64)
if err != nil {
db.log.Error("Compaction entry parsing failed", "err", err)
return
}
counters[i%2][idx] += value
counters[i%2][3], counters[i%2][4] = float64(delayN), float64(duration.Nanoseconds())

if db.writeDelayNMeter != nil {
db.writeDelayNMeter.Mark(int64((counters[i%2][3] - counters[(i-1)%2][3])))
// If the write delay number been collected in the last minute exceeds the predefined threshold,
// print a warning log here.
// If a warning that db performance is laggy has been displayed,
// any subsequent warnings will be withhold for 1 minute to don't overwhelm the user.
if int(db.writeDelayNMeter.Rate1()) > writeDelayNThreshold &&
time.Now().After(lastWriteDelayN.Add(writeDelayWarningThrottler)) {
db.log.Warn("Write delay number exceeds the threshold (200) in the last minute")
lastWriteDelayN = time.Now()
}
}
// Update all the requested meters
if db.compTimeMeter != nil {
db.compTimeMeter.Mark(int64((counters[i%2][0] - counters[(i-1)%2][0]) * 1000 * 1000 * 1000))
}
if db.compReadMeter != nil {
db.compReadMeter.Mark(int64((counters[i%2][1] - counters[(i-1)%2][1]) * 1024 * 1024))
}
if db.compWriteMeter != nil {
db.compWriteMeter.Mark(int64((counters[i%2][2] - counters[(i-1)%2][2]) * 1024 * 1024))

if db.writeDelayMeter != nil {
db.writeDelayMeter.Mark(int64((counters[i%2][4] - counters[(i-1)%2][4])))
// If the write delay duration been collected in the last minute exceeds the predefined threshold,
// print a warning log here.
// If a warning that db performance is laggy has been displayed,
// any subsequent warnings will be withhold for 1 minute to don't overwhelm the user.
if int64(db.writeDelayMeter.Rate1()) > writeDelayThreshold.Nanoseconds() &&
time.Now().After(lastWriteDelay.Add(writeDelayWarningThrottler)) {
db.log.Warn("Write delay duration exceeds the threshold (0.35sec) in the last minute")
lastWriteDelay = time.Now()
}
}

// Sleep a bit, then repeat the stats collection
select {
case errc := <-db.quitChan:
Expand Down
17 changes: 12 additions & 5 deletions vendor/github.com/syndtr/goleveldb/leveldb/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions vendor/github.com/syndtr/goleveldb/leveldb/db_write.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/github.com/syndtr/goleveldb/leveldb/memdb/memdb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion vendor/github.com/syndtr/goleveldb/leveldb/util/util.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions vendor/vendor.json
Original file line number Diff line number Diff line change
Expand Up @@ -358,10 +358,10 @@
"revisionTime": "2017-07-05T02:17:15Z"
},
{
"checksumSHA1": "yHbyLpI/Meh0DGrmi8x6FrDxxUY=",
"checksumSHA1": "l9hsW4atYllqlNjJUwHb6lyLn+I=",
"path": "github.com/syndtr/goleveldb/leveldb",
"revision": "b89cc31ef7977104127d34c1bd31ebd1a9db2199",
"revisionTime": "2017-07-25T06:48:36Z"
"revision": "34011bf325bce385408353a30b101fe5e923eb6e",
"revisionTime": "2017-12-14T12:08:11Z"
},
{
"checksumSHA1": "EKIow7XkgNdWvR/982ffIZxKG8Y=",
Expand Down