diff --git a/internal/unionstore/pipelined_memdb.go b/internal/unionstore/pipelined_memdb.go index ffdf804f8d..ddc8c69b70 100644 --- a/internal/unionstore/pipelined_memdb.go +++ b/internal/unionstore/pipelined_memdb.go @@ -55,6 +55,9 @@ type PipelinedMemDB struct { // Some([]) -> delete batchGetCache map[string]util.Option[[]byte] memChangeHook func(uint64) + + // metrics + flushWaitDuration time.Duration } const ( @@ -358,12 +361,14 @@ func (p *PipelinedMemDB) needFlush() bool { // FlushWait will wait for all flushing tasks are done and return the error if there is a failure. func (p *PipelinedMemDB) FlushWait() error { if p.flushingMemDB != nil { + now := time.Now() err := <-p.errCh if err != nil { err = p.handleAlreadyExistErr(err) } // cleanup the flushingMemDB so the next call of FlushWait will not wait for the error channel. p.flushingMemDB = nil + p.flushWaitDuration += time.Since(now) return err } return nil @@ -513,3 +518,10 @@ func (p *PipelinedMemDB) Checkpoint() *MemDBCheckpoint { func (p *PipelinedMemDB) RevertToCheckpoint(*MemDBCheckpoint) { panic("RevertToCheckpoint is not supported for PipelinedMemDB") } + +// GetFlushMetrics implements MemBuffer interface. +func (p *PipelinedMemDB) GetFlushMetrics() FlushMetrics { + return FlushMetrics{ + WaitDuration: p.flushWaitDuration, + } +} diff --git a/internal/unionstore/union_store.go b/internal/unionstore/union_store.go index d0ada377c6..b2d4da6c55 100644 --- a/internal/unionstore/union_store.go +++ b/internal/unionstore/union_store.go @@ -37,6 +37,7 @@ package unionstore import ( "context" "math" + "time" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/kv" @@ -236,6 +237,12 @@ type MemBuffer interface { Flush(force bool) (bool, error) // FlushWait waits for the flushing task done and return error. FlushWait() error + // GetFlushDetails returns the metrics related to flushing + GetFlushMetrics() FlushMetrics +} + +type FlushMetrics struct { + WaitDuration time.Duration } var ( @@ -287,3 +294,6 @@ func (db *MemDBWithContext) BatchGet(ctx context.Context, keys [][]byte) (map[st } return m, nil } + +// GetFlushMetrisc implements the MemBuffer interface. +func (db *MemDBWithContext) GetFlushMetrics() FlushMetrics { return FlushMetrics{} } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index aea701f2df..0ee2b5f215 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -853,6 +853,7 @@ type TxnInfo struct { OnePCFallback bool `json:"one_pc_fallback"` ErrMsg string `json:"error,omitempty"` Pipelined bool `json:"pipelined"` + FlushWaitMs int64 `json:"flush_wait_ms"` } func (txn *KVTxn) onCommitted(err error) { @@ -875,6 +876,7 @@ func (txn *KVTxn) onCommitted(err error) { AsyncCommitFallback: txn.committer.hasTriedAsyncCommit && !isAsyncCommit, OnePCFallback: txn.committer.hasTriedOnePC && !isOnePC, Pipelined: txn.IsPipelined(), + FlushWaitMs: txn.GetMemBuffer().GetFlushMetrics().WaitDuration.Milliseconds(), } if err != nil { info.ErrMsg = err.Error()