diff --git a/pkg/ccl/backupccl/backup_processor_planning.go b/pkg/ccl/backupccl/backup_processor_planning.go index 4bfdeb016f1f..2e3a53058f96 100644 --- a/pkg/ccl/backupccl/backup_processor_planning.go +++ b/pkg/ccl/backupccl/backup_processor_planning.go @@ -201,7 +201,6 @@ func distBackup( noTxn, /* txn - the flow does not read or write the database */ nil, /* clockUpdater */ evalCtx.Tracing, - evalCtx.ExecCfg.ContentionRegistry, ) defer recv.Release() diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 698ccd56d23f..dad8938bc7ee 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -271,7 +271,6 @@ func distRestore( noTxn, /* txn - the flow does not read or write the database */ nil, /* clockUpdater */ evalCtx.Tracing, - evalCtx.ExecCfg.ContentionRegistry, ) defer recv.Release() diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval.go b/pkg/ccl/changefeedccl/cdceval/expr_eval.go index 42a94a8d8390..43a82b18ffa1 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval.go @@ -282,7 +282,6 @@ func (e *Evaluator) executePlan( nil, nil, /* clockUpdater */ &sql.SessionTracing{}, - e.execCfg.ContentionRegistry, ) // Start execution. diff --git a/pkg/ccl/changefeedccl/changefeed_dist.go b/pkg/ccl/changefeedccl/changefeed_dist.go index ec562bd8f5da..e534e8d59c5a 100644 --- a/pkg/ccl/changefeedccl/changefeed_dist.go +++ b/pkg/ccl/changefeedccl/changefeed_dist.go @@ -292,7 +292,6 @@ func startDistChangefeed( noTxn, nil, /* clockUpdater */ evalCtx.Tracing, - execCtx.ExecCfg().ContentionRegistry, ) defer recv.Release() diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go index e973ce67c4bb..7bb18ad54874 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go @@ -148,7 +148,6 @@ func distStreamIngest( noTxn, nil, /* clockUpdater */ evalCtx.Tracing, - execCfg.ContentionRegistry, ) defer recv.Release() diff --git a/pkg/sql/apply_join.go b/pkg/sql/apply_join.go index 4d648255142b..9ae9071f2a63 100644 --- a/pkg/sql/apply_join.go +++ b/pkg/sql/apply_join.go @@ -272,7 +272,6 @@ func runPlanInsidePlan( params.p.Txn(), params.ExecCfg().Clock, params.p.extendedEvalCtx.Tracing, - params.p.ExecCfg().ContentionRegistry, ) defer recv.Release() diff --git a/pkg/sql/backfill.go b/pkg/sql/backfill.go index 9e598eb8aa40..c07fa088acd7 100644 --- a/pkg/sql/backfill.go +++ b/pkg/sql/backfill.go @@ -1085,7 +1085,6 @@ func (sc *SchemaChanger) distIndexBackfill( nil, /* txn - the flow does not run wholly in a txn */ sc.clock, evalCtx.Tracing, - sc.execCfg.ContentionRegistry, ) defer recv.Release() @@ -1330,7 +1329,6 @@ func (sc *SchemaChanger) distColumnBackfill( nil, /* txn - the flow does not run wholly in a txn */ sc.clock, evalCtx.Tracing, - sc.execCfg.ContentionRegistry, ) defer recv.Release() diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index 274b6c9421fc..338a8b16e796 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1621,7 +1621,6 @@ func (ex *connExecutor) execWithDistSQLEngine( planner.txn, ex.server.cfg.Clock, &ex.sessionTracing, - ex.server.cfg.ContentionRegistry, ) recv.progressAtomic = progressAtomic if ex.server.cfg.TestingKnobs.DistSQLReceiverPushCallbackFactory != nil { @@ -2445,6 +2444,7 @@ func (ex *connExecutor) recordTransactionFinish( if contentionDuration := ex.extraTxnState.accumulatedStats.ContentionTime.Nanoseconds(); contentionDuration > 0 { ex.metrics.EngineMetrics.SQLContendedTxns.Inc(1) + ex.planner.DistSQLPlanner().distSQLSrv.Metrics.ContendedQueriesCount.Inc(1) } ex.txnIDCacheWriter.Record(contentionpb.ResolvedTxnID{ diff --git a/pkg/sql/contentionpb/contention.proto b/pkg/sql/contentionpb/contention.proto index 66388d97e05e..e0d1d62410d3 100644 --- a/pkg/sql/contentionpb/contention.proto +++ b/pkg/sql/contentionpb/contention.proto @@ -13,7 +13,6 @@ package cockroach.sql.contentionpb; option go_package = "contentionpb"; import "roachpb/api.proto"; - import "gogoproto/gogo.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/timestamp.proto"; @@ -167,8 +166,8 @@ message ExtendedContentionEvent { (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID" ]; - google.protobuf.Timestamp collection_ts = 5 [ - (gogoproto.nullable) = false, - (gogoproto.stdtime) = true - ]; + google.protobuf.Timestamp collection_ts = 5 [ + (gogoproto.nullable) = false, + (gogoproto.stdtime) = true + ]; } diff --git a/pkg/sql/distsql_physical_planner_test.go b/pkg/sql/distsql_physical_planner_test.go index fb478755a178..d2f9f7f68bb1 100644 --- a/pkg/sql/distsql_physical_planner_test.go +++ b/pkg/sql/distsql_physical_planner_test.go @@ -204,7 +204,6 @@ func TestDistSQLReceiverUpdatesCaches(t *testing.T) { nil, /* txn */ nil, /* clockUpdater */ &SessionTracing{}, - nil, /* contentionRegistry */ ) replicas := []roachpb.ReplicaDescriptor{{ReplicaID: 1}, {ReplicaID: 2}, {ReplicaID: 3}} diff --git a/pkg/sql/distsql_plan_changefeed_test.go b/pkg/sql/distsql_plan_changefeed_test.go index 9b99f18e7f76..83b1da638a62 100644 --- a/pkg/sql/distsql_plan_changefeed_test.go +++ b/pkg/sql/distsql_plan_changefeed_test.go @@ -424,7 +424,6 @@ func TestCdcExpressionExecution(t *testing.T) { nil, nil, /* clockUpdater */ planner.extendedEvalCtx.Tracing, - planner.execCfg.ContentionRegistry, ) defer r.Release() diff --git a/pkg/sql/distsql_plan_stats.go b/pkg/sql/distsql_plan_stats.go index ada5c1c7932d..93b04e5b1a6c 100644 --- a/pkg/sql/distsql_plan_stats.go +++ b/pkg/sql/distsql_plan_stats.go @@ -517,7 +517,6 @@ func (dsp *DistSQLPlanner) planAndRunCreateStats( txn, evalCtx.ExecCfg.Clock, evalCtx.Tracing, - evalCtx.ExecCfg.ContentionRegistry, ) defer recv.Release() diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 55795327014e..e65182a782c4 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -30,8 +30,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/colflow" - "github.com/cockroachdb/cockroach/pkg/sql/contention" - "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/sql/distsql" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -49,7 +47,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/ring" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -57,7 +54,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" - pbtypes "github.com/gogo/protobuf/types" ) var settingDistSQLNumRunners = settings.RegisterIntSetting( @@ -761,7 +757,6 @@ func (dsp *DistSQLPlanner) Run( defer dsp.distSQLSrv.ServerConfig.Metrics.QueryStop() recv.outputTypes = plan.GetResultTypes() - recv.contendedQueryMetric = dsp.distSQLSrv.Metrics.ContendedQueriesCount if multitenant.TenantRUEstimateEnabled.Get(&dsp.st.SV) && dsp.distSQLSrv.TenantCostController != nil && planCtx.planner != nil { if instrumentation := planCtx.planner.curPlan.instrumentation; instrumentation != nil { @@ -909,12 +904,6 @@ type DistSQLReceiver struct { expectedRowsRead int64 progressAtomic *uint64 - // contendedQueryMetric is a Counter that is incremented at most once if the - // query produces at least one contention event. - contendedQueryMetric *metric.Counter - // contentionRegistry is a Registry that contention events are added to. - contentionRegistry *contention.Registry - testingKnobs struct { // pushCallback, if set, will be called every time DistSQLReceiver.Push // is called, with the same arguments. @@ -1129,7 +1118,6 @@ func MakeDistSQLReceiver( txn *kv.Txn, clockUpdater clockUpdater, tracing *SessionTracing, - contentionRegistry *contention.Registry, ) *DistSQLReceiver { consumeCtx, cleanup := tracing.TraceExecConsume(ctx) r := receiverSyncPool.Get().(*DistSQLReceiver) @@ -1142,15 +1130,14 @@ func MakeDistSQLReceiver( } } *r = DistSQLReceiver{ - ctx: consumeCtx, - cleanup: cleanup, - rangeCache: rangeCache, - txn: txn, - clockUpdater: clockUpdater, - stats: &topLevelQueryStats{}, - stmtType: stmtType, - tracing: tracing, - contentionRegistry: contentionRegistry, + ctx: consumeCtx, + cleanup: cleanup, + rangeCache: rangeCache, + txn: txn, + clockUpdater: clockUpdater, + stats: &topLevelQueryStats{}, + stmtType: stmtType, + tracing: tracing, } r.resultWriterMu.row = resultWriter r.resultWriterMu.batch = batchWriter @@ -1169,15 +1156,14 @@ func (r *DistSQLReceiver) Release() { func (r *DistSQLReceiver) clone() *DistSQLReceiver { ret := receiverSyncPool.Get().(*DistSQLReceiver) *ret = DistSQLReceiver{ - ctx: r.ctx, - cleanup: func() {}, - rangeCache: r.rangeCache, - txn: r.txn, - clockUpdater: r.clockUpdater, - stats: r.stats, - stmtType: tree.Rows, - tracing: r.tracing, - contentionRegistry: r.contentionRegistry, + ctx: r.ctx, + cleanup: func() {}, + rangeCache: r.rangeCache, + txn: r.txn, + clockUpdater: r.clockUpdater, + stats: r.stats, + stmtType: tree.Rows, + tracing: r.tracing, } return ret } @@ -1291,30 +1277,6 @@ func (r *DistSQLReceiver) pushMeta(meta *execinfrapb.ProducerMetadata) execinfra if span := tracing.SpanFromContext(r.ctx); span != nil { span.ImportRemoteRecording(meta.TraceData) } - var ev roachpb.ContentionEvent - for i := range meta.TraceData { - meta.TraceData[i].Structured(func(any *pbtypes.Any, _ time.Time) { - if !pbtypes.Is(any, &ev) { - return - } - if err := pbtypes.UnmarshalAny(any, &ev); err != nil { - return - } - if r.contendedQueryMetric != nil { - // Increment the contended query metric at most once - // if the query sees at least one contention event. - r.contendedQueryMetric.Inc(1) - r.contendedQueryMetric = nil - } - contentionEvent := contentionpb.ExtendedContentionEvent{ - BlockingEvent: ev, - } - if r.txn != nil { - contentionEvent.WaitingTxnID = r.txn.ID() - } - r.contentionRegistry.AddContentionEvent(contentionEvent) - }) - } } if meta.Metrics != nil { r.stats.bytesRead += meta.Metrics.BytesRead diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index 9b4606f807b9..8b78ac8a0b0b 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -162,7 +162,6 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) { txn, execCfg.Clock, p.ExtendedEvalContext().Tracing, - execCfg.ContentionRegistry, ) // We need to re-plan every time, since the plan is closed automatically @@ -223,7 +222,6 @@ func TestDistSQLReceiverErrorRanking(t *testing.T) { txn, nil, /* clockUpdater */ &SessionTracing{}, - nil, /* contentionRegistry */ ) retryErr := roachpb.NewErrorWithTxn( @@ -367,7 +365,6 @@ func TestDistSQLReceiverDrainsOnError(t *testing.T) { nil, /* txn */ nil, /* clockUpdater */ &SessionTracing{}, - nil, /* contentionRegistry */ ) status := recv.Push(nil /* row */, &execinfrapb.ProducerMetadata{Err: errors.New("some error")}) require.Equal(t, execinfra.DrainRequested, status) diff --git a/pkg/sql/executor_statement_metrics.go b/pkg/sql/executor_statement_metrics.go index 601aa0c9f681..93e5089861b8 100644 --- a/pkg/sql/executor_statement_metrics.go +++ b/pkg/sql/executor_statement_metrics.go @@ -15,6 +15,7 @@ import ( "strconv" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/contentionpb" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/sql/idxrecommendations" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" @@ -185,7 +186,7 @@ func (ex *connExecutor) recordStatementSummary( } recordedStmtStats := sqlstats.RecordedStmtStats{ SessionID: ex.sessionID, - StatementID: planner.stmt.QueryID, + StatementID: stmt.QueryID, AutoRetryCount: automaticRetryCount, AutoRetryReason: ex.state.mu.autoRetryReason, RowsAffected: rowsAffected, @@ -226,6 +227,15 @@ func (ex *connExecutor) recordStatementSummary( // Record statement execution statistics if span is recorded and no error was // encountered while collecting query-level statistics. if queryLevelStatsOk { + for _, ev := range queryLevelStats.ContentionEvents { + contentionEvent := contentionpb.ExtendedContentionEvent{ + BlockingEvent: ev, + WaitingTxnID: planner.txn.ID(), + } + + ex.server.cfg.ContentionRegistry.AddContentionEvent(contentionEvent) + } + err = ex.statsCollector.RecordStatementExecStats(recordedStmtStatsKey, *queryLevelStats) if err != nil { if log.V(2 /* level */) { diff --git a/pkg/sql/importer/import_processor_planning.go b/pkg/sql/importer/import_processor_planning.go index bca3aab66abe..707a710a21e2 100644 --- a/pkg/sql/importer/import_processor_planning.go +++ b/pkg/sql/importer/import_processor_planning.go @@ -225,7 +225,6 @@ func distImport( nil, /* txn - the flow does not read or write the database */ nil, /* clockUpdater */ evalCtx.Tracing, - evalCtx.ExecCfg.ContentionRegistry, ) defer recv.Release() diff --git a/pkg/sql/index_backfiller.go b/pkg/sql/index_backfiller.go index ca31cf02ca58..b8df168f4ba2 100644 --- a/pkg/sql/index_backfiller.go +++ b/pkg/sql/index_backfiller.go @@ -197,7 +197,6 @@ func (ib *IndexBackfillPlanner) plan( nil, /* txn - the flow does not run wholly in a txn */ ib.execCfg.Clock, evalCtx.Tracing, - ib.execCfg.ContentionRegistry, ) defer recv.Release() evalCtxCopy := evalCtx diff --git a/pkg/sql/mvcc_backfiller.go b/pkg/sql/mvcc_backfiller.go index 70555ae4ce46..4eaf6d45374c 100644 --- a/pkg/sql/mvcc_backfiller.go +++ b/pkg/sql/mvcc_backfiller.go @@ -151,7 +151,6 @@ func (im *IndexBackfillerMergePlanner) plan( nil, /* txn - the flow does not run wholly in a txn */ im.execCfg.Clock, evalCtx.Tracing, - im.execCfg.ContentionRegistry, ) defer recv.Release() evalCtxCopy := evalCtx diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index 4ebda18e3de4..fcc1765ac1a6 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -336,7 +336,6 @@ func (sc *SchemaChanger) backfillQueryIntoTable( // because it sets "enabled: false" and thus none of the // other fields are used. &SessionTracing{}, - sc.execCfg.ContentionRegistry, ) defer recv.Release() diff --git a/pkg/sql/testutils.go b/pkg/sql/testutils.go index 030caee9ae1c..bad5dba7eb43 100644 --- a/pkg/sql/testutils.go +++ b/pkg/sql/testutils.go @@ -144,7 +144,6 @@ func (dsp *DistSQLPlanner) Exec( p.txn, execCfg.Clock, p.ExtendedEvalContext().Tracing, - execCfg.ContentionRegistry, ) defer recv.Release() @@ -174,7 +173,6 @@ func (dsp *DistSQLPlanner) ExecLocalAll( p.txn, execCfg.Clock, p.ExtendedEvalContext().Tracing, - execCfg.ContentionRegistry, ) defer recv.Release() diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index 65685384011c..cf7420732754 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -307,7 +307,6 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err nil, /* txn */ nil, /* clockUpdater */ evalCtx.Tracing, - execCfg.ContentionRegistry, ) defer distSQLReceiver.Release()