diff --git a/executor/adapter.go b/executor/adapter.go index 3758d7d68c430..022caf555535f 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -87,6 +87,17 @@ var ( selectForUpdateRetryDuration = metrics.PessimisticDMLDurationByAttempt.WithLabelValues("select-for-update", "retry") dmlFirstAttemptDuration = metrics.PessimisticDMLDurationByAttempt.WithLabelValues("dml", "first-attempt") dmlRetryDuration = metrics.PessimisticDMLDurationByAttempt.WithLabelValues("dml", "retry") + + // aggressiveLockingTxnUsedCount counts transactions where at least one statement has aggressive locking enabled. + aggressiveLockingTxnUsedCount = metrics.AggressiveLockingUsageCount.WithLabelValues(metrics.LblAggressiveLockingTxnUsed) + // aggressiveLockingStmtUsedCount counts statements that have aggressive locking enabled. + aggressiveLockingStmtUsedCount = metrics.AggressiveLockingUsageCount.WithLabelValues(metrics.LblAggressiveLockingStmtUsed) + // aggressiveLockingTxnUsedCount counts transactions where at least one statement has aggressive locking enabled, + // and it takes effect (which is determined according to whether lock-with-conflict has occurred during execution). + aggressiveLockingTxnEffectiveCount = metrics.AggressiveLockingUsageCount.WithLabelValues(metrics.LblAggressiveLockingTxnEffective) + // aggressiveLockingTxnUsedCount counts statements where at least one statement has aggressive locking enabled, + // and it takes effect (which is determined according to whether lock-with-conflict has occurred during execution). + aggressiveLockingStmtEffectiveCount = metrics.AggressiveLockingUsageCount.WithLabelValues(metrics.LblAggressiveLockingStmtEffective) ) // processinfoSetter is the interface use to set current running process info. @@ -452,6 +463,18 @@ func (a *ExecStmt) Exec(ctx context.Context) (_ sqlexec.RecordSet, err error) { if lockKeysCnt > 0 { metrics.StatementLockKeysCount.Observe(float64(lockKeysCnt)) } + + execDetails := a.Ctx.GetSessionVars().StmtCtx.GetExecDetails() + if err == nil && execDetails.LockKeysDetail != nil && + (execDetails.LockKeysDetail.AggressiveLockNewCount > 0 || execDetails.LockKeysDetail.AggressiveLockDerivedCount > 0) { + a.Ctx.GetSessionVars().TxnCtx.AggressiveLockingUsed = true + // If this statement is finished when some of the keys are locked with conflict in the last retry, or + // some of the keys are derived from the previous retry, we consider the optimization of aggressive locking + // takes effect on this statement. + if execDetails.LockKeysDetail.LockedWithConflictCount > 0 || execDetails.LockKeysDetail.AggressiveLockDerivedCount > 0 { + a.Ctx.GetSessionVars().TxnCtx.AggressiveLockingEffective = true + } + } return } if str, ok := r.(string); !ok || !strings.Contains(str, memory.PanicMemoryExceed) { @@ -1472,6 +1495,28 @@ func (a *ExecStmt) FinishExecuteStmt(txnTS uint64, err error, hasMoreResults boo if sessVars.StmtCtx.ReadFromTableCache { metrics.ReadFromTableCacheCounter.Inc() } + + // Update aggressive locking related counters by stmt + if execDetail.LockKeysDetail != nil { + if execDetail.LockKeysDetail.AggressiveLockNewCount > 0 || execDetail.LockKeysDetail.AggressiveLockDerivedCount > 0 { + aggressiveLockingStmtUsedCount.Inc() + // If this statement is finished when some of the keys are locked with conflict in the last retry, or + // some of the keys are derived from the previous retry, we consider the optimization of aggressive locking + // takes effect on this statement. + if execDetail.LockKeysDetail.LockedWithConflictCount > 0 || execDetail.LockKeysDetail.AggressiveLockDerivedCount > 0 { + aggressiveLockingStmtEffectiveCount.Inc() + } + } + } + // If the transaction is committed, update aggressive locking related counters by txn + if execDetail.CommitDetail != nil { + if sessVars.TxnCtx.AggressiveLockingUsed { + aggressiveLockingTxnUsedCount.Inc() + } + if sessVars.TxnCtx.AggressiveLockingEffective { + aggressiveLockingTxnEffectiveCount.Inc() + } + } } func (a *ExecStmt) checkPlanReplayerCapture(txnTS uint64) { diff --git a/metrics/grafana/tidb.json b/metrics/grafana/tidb.json index 59daac4b0720e..612c62ee641bf 100644 --- a/metrics/grafana/tidb.json +++ b/metrics/grafana/tidb.json @@ -6799,6 +6799,204 @@ "align": false, "alignLevel": null } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "Counters of transactions or statements in which aggressive locking is enabled / takes effect.", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "grid": {}, + "gridPos": { + "h": 7, + "w": 8, + "x": 0, + "y": 74 + }, + "hiddenSeries": false, + "id": 295, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "hideEmpty": false, + "max": false, + "min": false, + "rightSide": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(irate(tidb_session_transaction_aggressive_locking_usage{instance=~\"$instance\"}[30s])) by (type)", + "interval": "", + "legendFormat": "{{type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Aggressive Locking Usage", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "description": "Counters of keys involved in aggressive locking.", + "fieldConfig": { + "defaults": {}, + "overrides": [] + }, + "fill": 1, + "fillGradient": 0, + "grid": {}, + "gridPos": { + "h": 7, + "w": 8, + "x": 8, + "y": 74 + }, + "hiddenSeries": false, + "id": 296, + "legend": { + "alignAsTable": false, + "avg": false, + "current": false, + "hideEmpty": false, + "max": false, + "min": false, + "rightSide": false, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "nullPointMode": "null", + "options": { + "alertThreshold": true + }, + "percentage": false, + "pluginVersion": "7.5.11", + "pointradius": 2, + "points": false, + "renderer": "flot", + "seriesOverrides": [], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "exemplar": true, + "expr": "sum(irate(tidb_tikvclient_aggressive_locking_count{instance=~\"$instance\"}[30s])) by (type)", + "interval": "", + "legendFormat": "{{type}}", + "refId": "A" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "Aggressive Locking Keys", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": "0", + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ], + "yaxis": { + "align": false, + "alignLevel": null + } } ], "repeat": null, @@ -12883,7 +13081,7 @@ "bars": false, "dashLength": 10, "dashes": false, - "datasource": "tidb-cluster", + "datasource": "${DS_TEST-CLUSTER}", "description": "Speed of add index", "editable": true, "error": false, diff --git a/metrics/metrics.go b/metrics/metrics.go index 1e25f9438c794..62b6973816424 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -212,6 +212,7 @@ func RegisterMetrics() { prometheus.MustRegister(AutoIDReqDuration) prometheus.MustRegister(RegionCheckpointSubscriptionEvent) prometheus.MustRegister(RCCheckTSWriteConfilictCounter) + prometheus.MustRegister(AggressiveLockingUsageCount) prometheus.MustRegister(TTLQueryDuration) prometheus.MustRegister(TTLProcessedExpiredRowsCounter) diff --git a/metrics/session.go b/metrics/session.go index 359afb1c5d310..962ed67477ebb 100644 --- a/metrics/session.go +++ b/metrics/session.go @@ -186,6 +186,14 @@ var ( Name: "resource_group_query_total", Help: "Counter of the total number of queries for the resource group", }, []string{LblName}) + + AggressiveLockingUsageCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "tidb", + Subsystem: "server", + Name: "transaction_aggressive_locking_usage", + Help: "The counter of statements and transactions in which aggressive locking is used or takes effect", + }, []string{LblType}) ) // Label constants. @@ -227,5 +235,11 @@ const ( LblModule = "module" LblRCReadCheckTS = "read_check" LblRCWriteCheckTS = "write_check" - LblName = "name" + + LblName = "name" + + LblAggressiveLockingTxnUsed = "txn-used" + LblAggressiveLockingTxnEffective = "txn-effective" + LblAggressiveLockingStmtUsed = "stmt-used" + LblAggressiveLockingStmtEffective = "stmt-effective" ) diff --git a/metrics/telemetry.go b/metrics/telemetry.go index db4c80714c0ea..c133610c56189 100644 --- a/metrics/telemetry.go +++ b/metrics/telemetry.go @@ -484,3 +484,25 @@ func GetStoreBatchCoprCounter() StoreBatchCoprCounter { BatchedFallbackCount: readCounter(TelemetryStoreBatchedFallbackCnt), } } + +// AggressiveLockingUsageCounter records the usage of Aggressive Locking feature of pessimistic transaction. +type AggressiveLockingUsageCounter struct { + TxnAggressiveLockingUsed int64 `json:"txn_aggressive_locking_used"` + TxnAggressiveLockingEffective int64 `json:"txn_aggressive_locking_effective"` +} + +// Sub returns the difference of two counters. +func (i AggressiveLockingUsageCounter) Sub(rhs AggressiveLockingUsageCounter) AggressiveLockingUsageCounter { + return AggressiveLockingUsageCounter{ + TxnAggressiveLockingUsed: i.TxnAggressiveLockingUsed - rhs.TxnAggressiveLockingUsed, + TxnAggressiveLockingEffective: i.TxnAggressiveLockingEffective - rhs.TxnAggressiveLockingEffective, + } +} + +// GetAggressiveLockingUsageCounter returns the Aggressive Locking usage counter. +func GetAggressiveLockingUsageCounter() AggressiveLockingUsageCounter { + return AggressiveLockingUsageCounter{ + TxnAggressiveLockingUsed: readCounter(AggressiveLockingUsageCount.WithLabelValues(LblAggressiveLockingTxnUsed)), + TxnAggressiveLockingEffective: readCounter(AggressiveLockingUsageCount.WithLabelValues(LblAggressiveLockingTxnEffective)), + } +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 0c4c2cb862983..9a2490d2bc424 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -240,6 +240,14 @@ type TxnCtxNoNeedToRestore struct { EnableMDL bool // relatedTableForMDL records the `lock` table for metadata lock. It maps from int64 to int64(version). relatedTableForMDL *sync.Map + + // AggressiveLockingUsed marking whether at least one of the statements in the transaction was executed in + // aggressive locking mode. + AggressiveLockingUsed bool + // AggressiveLockingEffective marking whether at least one of the statements in the transaction was executed in + // aggressive locking mode, and it takes effect (which is determined according to whether lock-with-conflict + // has occurred during execution of any statement). + AggressiveLockingEffective bool } // SavepointRecord indicates a transaction's savepoint record. diff --git a/telemetry/data.go b/telemetry/data.go index 2a3ecffddaa1e..a3dc3947954dd 100644 --- a/telemetry/data.go +++ b/telemetry/data.go @@ -71,6 +71,7 @@ func postReportTelemetryData() { postReportDDLUsage() postReportIndexMergeUsage() postStoreBatchUsage() + postReportAggressiveLockingUsageCounter() } // PostReportTelemetryDataForTest is for test. diff --git a/telemetry/data_feature_usage.go b/telemetry/data_feature_usage.go index 1a186cc3451d2..1d782dcd53632 100644 --- a/telemetry/data_feature_usage.go +++ b/telemetry/data_feature_usage.go @@ -246,15 +246,17 @@ func getClusterIndexUsageInfo(ctx context.Context, sctx sessionctx.Context) (ncu // TxnUsage records the usage info of transaction related features, including // async-commit, 1PC and counters of transactions committed with different protocols. type TxnUsage struct { - AsyncCommitUsed bool `json:"asyncCommitUsed"` - OnePCUsed bool `json:"onePCUsed"` - TxnCommitCounter metrics.TxnCommitCounter `json:"txnCommitCounter"` - MutationCheckerUsed bool `json:"mutationCheckerUsed"` - AssertionLevel string `json:"assertionLevel"` - RcCheckTS bool `json:"rcCheckTS"` - RCWriteCheckTS bool `json:"rcWriteCheckTS"` - SavepointCounter int64 `json:"SavepointCounter"` - LazyUniqueCheckSetCounter int64 `json:"lazyUniqueCheckSetCounter"` + AsyncCommitUsed bool `json:"asyncCommitUsed"` + OnePCUsed bool `json:"onePCUsed"` + TxnCommitCounter metrics.TxnCommitCounter `json:"txnCommitCounter"` + MutationCheckerUsed bool `json:"mutationCheckerUsed"` + AssertionLevel string `json:"assertionLevel"` + RcCheckTS bool `json:"rcCheckTS"` + RCWriteCheckTS bool `json:"rcWriteCheckTS"` + AggressiveLocking bool `json:"aggressiveLocking"` + SavepointCounter int64 `json:"SavepointCounter"` + LazyUniqueCheckSetCounter int64 `json:"lazyUniqueCheckSetCounter"` + AggressiveLockingUsageCounter m.AggressiveLockingUsageCounter `json:"AggressiveLockingUsageCounter"` } var initialTxnCommitCounter metrics.TxnCommitCounter @@ -269,6 +271,7 @@ var initialLazyPessimisticUniqueCheckSetCount int64 var initialDDLUsageCounter m.DDLUsageCounter var initialIndexMergeCounter m.IndexMergeUsageCounter var initialStoreBatchCoprCounter m.StoreBatchCoprCounter +var initialAggressiveLockingUsageCounter m.AggressiveLockingUsageCounter // getTxnUsageInfo gets the usage info of transaction related features. It's exported for tests. func getTxnUsageInfo(ctx sessionctx.Context) *TxnUsage { @@ -298,13 +301,23 @@ func getTxnUsageInfo(ctx sessionctx.Context) *TxnUsage { if val, err := ctx.GetSessionVars().GetGlobalSystemVar(context.Background(), variable.TiDBRCWriteCheckTs); err == nil { rcWriteCheckTSUsed = val == variable.On } + aggressiveLockingUsed := false + if val, err := ctx.GetSessionVars().GetGlobalSystemVar(context.Background(), variable.TiDBPessimisticTransactionAggressiveLocking); err == nil { + aggressiveLockingUsed = val == variable.On + } + currSavepointCount := m.GetSavepointStmtCounter() diffSavepointCount := currSavepointCount - initialSavepointStmtCounter + currLazyUniqueCheckSetCount := m.GetLazyPessimisticUniqueCheckSetCounter() diffLazyUniqueCheckSetCount := currLazyUniqueCheckSetCount - initialLazyPessimisticUniqueCheckSetCount + + currAggressiveLockingUsageCounter := m.GetAggressiveLockingUsageCounter() + diffAggressiveLockingUsageCounter := currAggressiveLockingUsageCounter.Sub(initialAggressiveLockingUsageCounter) + return &TxnUsage{asyncCommitUsed, onePCUsed, diff, mutationCheckerUsed, assertionUsed, rcCheckTSUsed, rcWriteCheckTSUsed, - diffSavepointCount, diffLazyUniqueCheckSetCount, + aggressiveLockingUsed, diffSavepointCount, diffLazyUniqueCheckSetCount, diffAggressiveLockingUsageCounter, } } @@ -329,6 +342,10 @@ func postReportLazyPessimisticUniqueCheckSetCount() { initialLazyPessimisticUniqueCheckSetCount = m.GetLazyPessimisticUniqueCheckSetCounter() } +func postReportAggressiveLockingUsageCounter() { + initialAggressiveLockingUsageCounter = m.GetAggressiveLockingUsageCounter() +} + // getCTEUsageInfo gets the CTE usages. func getCTEUsageInfo() *m.CTEUsageCounter { curr := m.GetCTECounter() diff --git a/telemetry/data_feature_usage_test.go b/telemetry/data_feature_usage_test.go index f8e5701f0e69a..0630c26b31471 100644 --- a/telemetry/data_feature_usage_test.go +++ b/telemetry/data_feature_usage_test.go @@ -74,6 +74,14 @@ func TestTxnUsageInfo(t *testing.T) { tk.MustExec(fmt.Sprintf("set global %s = 1", variable.TiDBRCWriteCheckTs)) txnUsage = telemetry.GetTxnUsageInfo(tk.Session()) require.True(t, txnUsage.RCWriteCheckTS) + + tk.MustExec(fmt.Sprintf("set global %s = 0", variable.TiDBPessimisticTransactionAggressiveLocking)) + txnUsage = telemetry.GetTxnUsageInfo(tk.Session()) + require.False(t, txnUsage.AggressiveLocking) + + tk.MustExec(fmt.Sprintf("set global %s = 1", variable.TiDBPessimisticTransactionAggressiveLocking)) + txnUsage = telemetry.GetTxnUsageInfo(tk.Session()) + require.True(t, txnUsage.AggressiveLocking) }) t.Run("Count", func(t *testing.T) { @@ -844,3 +852,72 @@ func TestStoreBatchCopr(t *testing.T) { require.Equal(t, diff.BatchedCount, int64(1)) require.Equal(t, diff.BatchedFallbackCount, int64(1)) } + +func TestAggressiveLockingUsage(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk2 := testkit.NewTestKit(t, store) + + tk.MustExec("use test") + tk.MustExec("create table t (id int primary key, v int)") + tk.MustExec("insert into t values (1, 1), (2, 2)") + + usage, err := telemetry.GetFeatureUsage(tk2.Session()) + require.NoError(t, err) + require.Equal(t, int64(0), usage.Txn.AggressiveLockingUsageCounter.TxnAggressiveLockingUsed) + require.Equal(t, int64(0), usage.Txn.AggressiveLockingUsageCounter.TxnAggressiveLockingEffective) + + tk.MustExec("set @@tidb_pessimistic_txn_aggressive_locking = 1") + + tk.MustExec("begin pessimistic") + tk.MustExec("update t set v = v + 1 where id = 1") + usage, err = telemetry.GetFeatureUsage(tk2.Session()) + // Not counted before transaction committing. + require.NoError(t, err) + require.Equal(t, int64(0), usage.Txn.AggressiveLockingUsageCounter.TxnAggressiveLockingUsed) + require.Equal(t, int64(0), usage.Txn.AggressiveLockingUsageCounter.TxnAggressiveLockingEffective) + + tk.MustExec("commit") + usage, err = telemetry.GetFeatureUsage(tk2.Session()) + require.NoError(t, err) + require.Equal(t, int64(1), usage.Txn.AggressiveLockingUsageCounter.TxnAggressiveLockingUsed) + require.Equal(t, int64(0), usage.Txn.AggressiveLockingUsageCounter.TxnAggressiveLockingEffective) + + // Counted by transaction instead of by statement. + tk.MustExec("begin pessimistic") + tk.MustExec("update t set v = v + 1 where id = 1") + tk.MustExec("update t set v = v + 1 where id = 2") + tk.MustExec("commit") + usage, err = telemetry.GetFeatureUsage(tk2.Session()) + require.NoError(t, err) + require.Equal(t, int64(2), usage.Txn.AggressiveLockingUsageCounter.TxnAggressiveLockingUsed) + require.Equal(t, int64(0), usage.Txn.AggressiveLockingUsageCounter.TxnAggressiveLockingEffective) + + // Effective only when LockedWithConflict occurs. + tk3 := testkit.NewTestKit(t, store) + tk3.MustExec("use test") + tk.MustExec("begin pessimistic") + tk3.MustExec("begin pessimistic") + tk3.MustExec("update t set v = v + 1 where id = 1") + ch := make(chan interface{}) + go func() { + tk.MustExec("update t set v = v + 1 where id = 1") + ch <- nil + }() + select { + case <-ch: + require.Fail(t, "expected statement to be blocked but finished") + case <-time.After(time.Millisecond * 100): + } + tk3.MustExec("commit") + select { + case <-time.After(time.Second): + require.Fail(t, "expected statement to be resumed but still blocked") + case <-ch: + } + tk.MustExec("commit") + usage, err = telemetry.GetFeatureUsage(tk2.Session()) + require.NoError(t, err) + require.Equal(t, int64(3), usage.Txn.AggressiveLockingUsageCounter.TxnAggressiveLockingUsed) + require.Equal(t, int64(1), usage.Txn.AggressiveLockingUsageCounter.TxnAggressiveLockingEffective) +}