diff --git a/pump/storage/vlog.go b/pump/storage/vlog.go index 964c06467..7093c7b2c 100644 --- a/pump/storage/vlog.go +++ b/pump/storage/vlog.go @@ -11,6 +11,7 @@ import ( "strings" "sync" "sync/atomic" + "time" "github.com/ngaut/log" "github.com/pingcap/errors" @@ -303,7 +304,22 @@ func (vlog *valueLog) write(reqs []*request) error { var bufReqs []*request + rotate := func() error { + err := curFile.finalize() + if err != nil { + return errors.Annotatef(err, "finalize file %s failed", curFile.path) + } + + id := atomic.AddUint32(&vlog.maxFid, 1) + curFile, err = vlog.createLogFile(id) + if err != nil { + return errors.Annotatef(err, "create file id %d failed", id) + } + return nil + } + toDisk := func() error { + writeT0 := time.Now() n, err := curFile.fd.Write(vlog.buf.Bytes()) atomic.AddInt64(&vlog.writableLogOffset, int64(n)) @@ -311,11 +327,14 @@ func (vlog *valueLog) write(reqs []*request) error { return errors.Annotatef(err, "unable to write to log file: %s", curFile.path) } if vlog.sync { + fsyncT0 := time.Now() err = curFile.fdatasync() + writeBinlogTimeHistogram.WithLabelValues("fsync").Observe(time.Since(fsyncT0).Seconds()) if err != nil { return errors.Annotatef(err, "fdatasync file %s failed", curFile.path) } } + writeBinlogTimeHistogram.WithLabelValues("to_disk").Observe(time.Since(writeT0).Seconds()) for _, req := range bufReqs { curFile.updateMaxTS(req.ts()) @@ -325,15 +344,11 @@ func (vlog *valueLog) write(reqs []*request) error { // rotate file if vlog.writableOffset() > vlog.opt.ValueLogFileSize { - err := curFile.finalize() - if err != nil { - return errors.Annotatef(err, "finalize file %s failed", curFile.path) - } - - id := atomic.AddUint32(&vlog.maxFid, 1) - curFile, err = vlog.createLogFile(id) + rotateT0 := time.Now() + err := rotate() + writeBinlogTimeHistogram.WithLabelValues("rotate").Observe(time.Since(rotateT0).Seconds()) if err != nil { - return errors.Annotatef(err, "create file id %d failed", id) + return err } } return nil