From 4e53c6787ca2cb59780c52afb06bc089d905a73f Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 14 May 2024 17:57:16 +0800 Subject: [PATCH 1/2] feat: record flush_wait_ms in TxnInfo Signed-off-by: ekexium --- internal/unionstore/pipelined_memdb.go | 10 ++++++++++ internal/unionstore/union_store.go | 6 ++++++ txnkv/transaction/txn.go | 2 ++ 3 files changed, 18 insertions(+) diff --git a/internal/unionstore/pipelined_memdb.go b/internal/unionstore/pipelined_memdb.go index ffdf804f8d..2c1147dc73 100644 --- a/internal/unionstore/pipelined_memdb.go +++ b/internal/unionstore/pipelined_memdb.go @@ -55,6 +55,9 @@ type PipelinedMemDB struct { // Some([]) -> delete batchGetCache map[string]util.Option[[]byte] memChangeHook func(uint64) + + // metrics + flushWaitDuration time.Duration } const ( @@ -358,12 +361,14 @@ func (p *PipelinedMemDB) needFlush() bool { // FlushWait will wait for all flushing tasks are done and return the error if there is a failure. func (p *PipelinedMemDB) FlushWait() error { if p.flushingMemDB != nil { + now := time.Now() err := <-p.errCh if err != nil { err = p.handleAlreadyExistErr(err) } // cleanup the flushingMemDB so the next call of FlushWait will not wait for the error channel. p.flushingMemDB = nil + p.flushWaitDuration += time.Since(now) return err } return nil @@ -513,3 +518,8 @@ func (p *PipelinedMemDB) Checkpoint() *MemDBCheckpoint { func (p *PipelinedMemDB) RevertToCheckpoint(*MemDBCheckpoint) { panic("RevertToCheckpoint is not supported for PipelinedMemDB") } + +// FlushWaitDuration implements MemBuffer interface. +func (p *PipelinedMemDB) FlushWaitDuration() time.Duration { + return p.flushWaitDuration +} diff --git a/internal/unionstore/union_store.go b/internal/unionstore/union_store.go index d0ada377c6..2603a0d3cb 100644 --- a/internal/unionstore/union_store.go +++ b/internal/unionstore/union_store.go @@ -37,6 +37,7 @@ package unionstore import ( "context" "math" + "time" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/kv" @@ -236,6 +237,8 @@ type MemBuffer interface { Flush(force bool) (bool, error) // FlushWait waits for the flushing task done and return error. FlushWait() error + // FlushWaitDuration returns the total duration spent on FlushWait() + FlushWaitDuration() time.Duration } var ( @@ -287,3 +290,6 @@ func (db *MemDBWithContext) BatchGet(ctx context.Context, keys [][]byte) (map[st } return m, nil } + +// FlushWaitDuration implements the MemBuffer interface. +func (db *MemDBWithContext) FlushWaitDuration() time.Duration { return 0 } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index aea701f2df..2df7d10f0f 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -853,6 +853,7 @@ type TxnInfo struct { OnePCFallback bool `json:"one_pc_fallback"` ErrMsg string `json:"error,omitempty"` Pipelined bool `json:"pipelined"` + FlushWaitMs int64 `json:"flush_wait_ms"` } func (txn *KVTxn) onCommitted(err error) { @@ -875,6 +876,7 @@ func (txn *KVTxn) onCommitted(err error) { AsyncCommitFallback: txn.committer.hasTriedAsyncCommit && !isAsyncCommit, OnePCFallback: txn.committer.hasTriedOnePC && !isOnePC, Pipelined: txn.IsPipelined(), + FlushWaitMs: txn.GetMemBuffer().FlushWaitDuration().Milliseconds(), } if err != nil { info.ErrMsg = err.Error() From 712aca90dbb9293892fdc4b94d423ba6724f9365 Mon Sep 17 00:00:00 2001 From: ekexium Date: Tue, 14 May 2024 19:53:40 +0800 Subject: [PATCH 2/2] refactor: change FlushWaitDuration() to GetFlushMetrics() Signed-off-by: ekexium --- internal/unionstore/pipelined_memdb.go | 8 +++++--- internal/unionstore/union_store.go | 12 ++++++++---- txnkv/transaction/txn.go | 2 +- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/internal/unionstore/pipelined_memdb.go b/internal/unionstore/pipelined_memdb.go index 2c1147dc73..ddc8c69b70 100644 --- a/internal/unionstore/pipelined_memdb.go +++ b/internal/unionstore/pipelined_memdb.go @@ -519,7 +519,9 @@ func (p *PipelinedMemDB) RevertToCheckpoint(*MemDBCheckpoint) { panic("RevertToCheckpoint is not supported for PipelinedMemDB") } -// FlushWaitDuration implements MemBuffer interface. -func (p *PipelinedMemDB) FlushWaitDuration() time.Duration { - return p.flushWaitDuration +// GetFlushMetrics implements MemBuffer interface. +func (p *PipelinedMemDB) GetFlushMetrics() FlushMetrics { + return FlushMetrics{ + WaitDuration: p.flushWaitDuration, + } } diff --git a/internal/unionstore/union_store.go b/internal/unionstore/union_store.go index 2603a0d3cb..b2d4da6c55 100644 --- a/internal/unionstore/union_store.go +++ b/internal/unionstore/union_store.go @@ -237,8 +237,12 @@ type MemBuffer interface { Flush(force bool) (bool, error) // FlushWait waits for the flushing task done and return error. FlushWait() error - // FlushWaitDuration returns the total duration spent on FlushWait() - FlushWaitDuration() time.Duration + // GetFlushDetails returns the metrics related to flushing + GetFlushMetrics() FlushMetrics +} + +type FlushMetrics struct { + WaitDuration time.Duration } var ( @@ -291,5 +295,5 @@ func (db *MemDBWithContext) BatchGet(ctx context.Context, keys [][]byte) (map[st return m, nil } -// FlushWaitDuration implements the MemBuffer interface. -func (db *MemDBWithContext) FlushWaitDuration() time.Duration { return 0 } +// GetFlushMetrisc implements the MemBuffer interface. +func (db *MemDBWithContext) GetFlushMetrics() FlushMetrics { return FlushMetrics{} } diff --git a/txnkv/transaction/txn.go b/txnkv/transaction/txn.go index 2df7d10f0f..0ee2b5f215 100644 --- a/txnkv/transaction/txn.go +++ b/txnkv/transaction/txn.go @@ -876,7 +876,7 @@ func (txn *KVTxn) onCommitted(err error) { AsyncCommitFallback: txn.committer.hasTriedAsyncCommit && !isAsyncCommit, OnePCFallback: txn.committer.hasTriedOnePC && !isOnePC, Pipelined: txn.IsPipelined(), - FlushWaitMs: txn.GetMemBuffer().FlushWaitDuration().Milliseconds(), + FlushWaitMs: txn.GetMemBuffer().GetFlushMetrics().WaitDuration.Milliseconds(), } if err != nil { info.ErrMsg = err.Error()