diff --git a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go index d3b9fc399f3c..268c0a6410f3 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_stmt.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_stmt.go @@ -42,6 +42,22 @@ func init() { const telemetryPath = `changefeed.alter` +type telemetryAggregator struct { + txn *kv.Txn +} + +func (a *telemetryAggregator) Count(path string) { + a.txn.AddCommitTrigger(func(context.Context) { + telemetry.Count(path) + }) +} + +func (a *telemetryAggregator) CountBucketed(path string, value int64) { + a.txn.AddCommitTrigger(func(context.Context) { + telemetry.CountBucketed(path, value) + }) +} + func alterChangefeedTypeCheck( ctx context.Context, stmt tree.Statement, p sql.PlanHookState, ) (matched bool, header colinfo.ResultColumns, _ error) { @@ -116,8 +132,11 @@ func alterChangefeedPlanHook( return err } exprEval := p.ExprEvaluator("ALTER CHANGEFEED") + telemetryAgg := &telemetryAggregator{ + txn: p.Txn(), + } newOptions, newSinkURI, err := generateNewOpts( - ctx, exprEval, alterChangefeedStmt.Cmds, prevOpts, prevDetails.SinkURI, + ctx, exprEval, alterChangefeedStmt.Cmds, prevOpts, prevDetails.SinkURI, telemetryAgg, ) if err != nil { return err @@ -128,6 +147,7 @@ func alterChangefeedPlanHook( alterChangefeedStmt.Cmds, newOptions.AsMap(), // TODO: Remove .AsMap() prevDetails, job.Progress(), + telemetryAgg, ) if err != nil { return err @@ -200,8 +220,7 @@ func alterChangefeedPlanHook( if err != nil { return err } - - telemetry.Count(telemetryPath) + telemetryAgg.Count(telemetryPath) select { case <-ctx.Done(): @@ -253,6 +272,7 @@ func generateNewOpts( alterCmds tree.AlterChangefeedCmds, prevOpts map[string]string, prevSinkURI string, + telemetryAgg *telemetryAggregator, ) (changefeedbase.StatementOptions, string, error) { sinkURI := prevSinkURI newOptions := prevOpts @@ -298,7 +318,7 @@ func generateNewOpts( newOptions[key] = value } } - telemetry.CountBucketed(telemetryPath+`.set_options`, int64(len(opts))) + telemetryAgg.CountBucketed(telemetryPath+`.set_options`, int64(len(opts))) case *tree.AlterChangefeedUnsetOptions: optKeys := v.Options.ToStrings() for _, key := range optKeys { @@ -313,7 +333,7 @@ func generateNewOpts( } delete(newOptions, key) } - telemetry.CountBucketed(telemetryPath+`.unset_options`, int64(len(optKeys))) + telemetryAgg.CountBucketed(telemetryPath+`.unset_options`, int64(len(optKeys))) } } @@ -328,6 +348,7 @@ func generateNewTargets( opts map[string]string, prevDetails jobspb.ChangefeedDetails, prevProgress jobspb.Progress, + telemetryAgg *telemetryAggregator, ) ( tree.ChangefeedTargets, *jobspb.Progress, @@ -522,7 +543,7 @@ func generateNewTargets( if err != nil { return nil, nil, hlc.Timestamp{}, nil, err } - telemetry.CountBucketed(telemetryPath+`.added_targets`, int64(len(v.Targets))) + telemetryAgg.CountBucketed(telemetryPath+`.added_targets`, int64(len(v.Targets))) case *tree.AlterChangefeedDropTarget: for _, target := range v.Targets { desc, found, err := getTargetDesc(ctx, p, descResolver, target.TableName) @@ -548,7 +569,7 @@ func generateNewTargets( } delete(newTargets, k) } - telemetry.CountBucketed(telemetryPath+`.dropped_targets`, int64(len(v.Targets))) + telemetryAgg.CountBucketed(telemetryPath+`.dropped_targets`, int64(len(v.Targets))) } } diff --git a/pkg/ccl/changefeedccl/alter_changefeed_test.go b/pkg/ccl/changefeedccl/alter_changefeed_test.go index 39aefc5108fb..c3a3473ba40a 100644 --- a/pkg/ccl/changefeedccl/alter_changefeed_test.go +++ b/pkg/ccl/changefeedccl/alter_changefeed_test.go @@ -440,6 +440,53 @@ func TestAlterChangefeedTelemetry(t *testing.T) { cdcTest(t, testFn, feedTestEnterpriseSinks) } +func TestAlterChangefeedTelemetryTxnRetry(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`) + sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO bar VALUES (1)`) + sqlDB.Exec(t, `CREATE TABLE baz (a INT PRIMARY KEY)`) + sqlDB.Exec(t, `INSERT INTO baz VALUES (1)`) + + // Reset the counts. + _ = telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts) + + testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`) + defer closeFeed(t, testFeed) + feed := testFeed.(cdctest.EnterpriseTestFeed) + require.NoError(t, feed.Pause()) + + // Here we issue the alter stmt, rollback the txn, and + // issue it again. + alterStmt := fmt.Sprintf("ALTER CHANGEFEED %d DROP bar, foo ADD baz UNSET diff SET resolved, format=json", feed.JobID()) + tx, err := s.DB.Begin() + require.NoError(t, err) + _, err = tx.Exec("SAVEPOINT a") + require.NoError(t, err) + _, err = tx.Exec(alterStmt) + require.NoError(t, err) + _, err = tx.Exec("ROLLBACK TO a") + require.NoError(t, err) + _, err = tx.Exec(alterStmt) + require.NoError(t, err) + require.NoError(t, tx.Commit()) + + counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts) + require.Equal(t, int32(1), counts[`changefeed.alter`]) + require.Equal(t, int32(1), counts[`changefeed.alter.dropped_targets.2`]) + require.Equal(t, int32(1), counts[`changefeed.alter.added_targets.1`]) + require.Equal(t, int32(1), counts[`changefeed.alter.set_options.2`]) + require.Equal(t, int32(1), counts[`changefeed.alter.unset_options.1`]) + } + + cdcTest(t, testFn, feedTestEnterpriseSinks) +} + // The purpose of this test is to ensure that the ALTER CHANGEFEED statement // does not accidentally redact secret keys in the changefeed details func TestAlterChangefeedPersistSinkURI(t *testing.T) {