Skip to content

Commit

Permalink
changefeedccl: avoid telemetry double count on txn retry
Browse files Browse the repository at this point in the history
Previously, if the ALTER CHANGEFEED txn retried, we would increment
these counters again.

In cockroachdb#91563 we are making a change that made it more likely that this
transaction may retry during one of the test, revealing the issue.

Release note: None
  • Loading branch information
stevendanna committed Nov 13, 2022
1 parent f82dead commit e1b2b67
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 7 deletions.
35 changes: 28 additions & 7 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,22 @@ func init() {

const telemetryPath = `changefeed.alter`

type telemetryAggregator struct {
txn *kv.Txn
}

func (a *telemetryAggregator) Count(path string) {
a.txn.AddCommitTrigger(func(context.Context) {
telemetry.Count(path)
})
}

func (a *telemetryAggregator) CountBucketed(path string, value int64) {
a.txn.AddCommitTrigger(func(context.Context) {
telemetry.CountBucketed(path, value)
})
}

func alterChangefeedTypeCheck(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (matched bool, header colinfo.ResultColumns, _ error) {
Expand Down Expand Up @@ -116,8 +132,11 @@ func alterChangefeedPlanHook(
return err
}
exprEval := p.ExprEvaluator("ALTER CHANGEFEED")
telemetryAgg := &telemetryAggregator{
txn: p.Txn(),
}
newOptions, newSinkURI, err := generateNewOpts(
ctx, exprEval, alterChangefeedStmt.Cmds, prevOpts, prevDetails.SinkURI,
ctx, exprEval, alterChangefeedStmt.Cmds, prevOpts, prevDetails.SinkURI, telemetryAgg,
)
if err != nil {
return err
Expand All @@ -128,6 +147,7 @@ func alterChangefeedPlanHook(
alterChangefeedStmt.Cmds,
newOptions.AsMap(), // TODO: Remove .AsMap()
prevDetails, job.Progress(),
telemetryAgg,
)
if err != nil {
return err
Expand Down Expand Up @@ -200,8 +220,7 @@ func alterChangefeedPlanHook(
if err != nil {
return err
}

telemetry.Count(telemetryPath)
telemetryAgg.Count(telemetryPath)

select {
case <-ctx.Done():
Expand Down Expand Up @@ -253,6 +272,7 @@ func generateNewOpts(
alterCmds tree.AlterChangefeedCmds,
prevOpts map[string]string,
prevSinkURI string,
telemetryAgg *telemetryAggregator,
) (changefeedbase.StatementOptions, string, error) {
sinkURI := prevSinkURI
newOptions := prevOpts
Expand Down Expand Up @@ -298,7 +318,7 @@ func generateNewOpts(
newOptions[key] = value
}
}
telemetry.CountBucketed(telemetryPath+`.set_options`, int64(len(opts)))
telemetryAgg.CountBucketed(telemetryPath+`.set_options`, int64(len(opts)))
case *tree.AlterChangefeedUnsetOptions:
optKeys := v.Options.ToStrings()
for _, key := range optKeys {
Expand All @@ -313,7 +333,7 @@ func generateNewOpts(
}
delete(newOptions, key)
}
telemetry.CountBucketed(telemetryPath+`.unset_options`, int64(len(optKeys)))
telemetryAgg.CountBucketed(telemetryPath+`.unset_options`, int64(len(optKeys)))
}
}

Expand All @@ -328,6 +348,7 @@ func generateNewTargets(
opts map[string]string,
prevDetails jobspb.ChangefeedDetails,
prevProgress jobspb.Progress,
telemetryAgg *telemetryAggregator,
) (
tree.ChangefeedTargets,
*jobspb.Progress,
Expand Down Expand Up @@ -522,7 +543,7 @@ func generateNewTargets(
if err != nil {
return nil, nil, hlc.Timestamp{}, nil, err
}
telemetry.CountBucketed(telemetryPath+`.added_targets`, int64(len(v.Targets)))
telemetryAgg.CountBucketed(telemetryPath+`.added_targets`, int64(len(v.Targets)))
case *tree.AlterChangefeedDropTarget:
for _, target := range v.Targets {
desc, found, err := getTargetDesc(ctx, p, descResolver, target.TableName)
Expand All @@ -548,7 +569,7 @@ func generateNewTargets(
}
delete(newTargets, k)
}
telemetry.CountBucketed(telemetryPath+`.dropped_targets`, int64(len(v.Targets)))
telemetryAgg.CountBucketed(telemetryPath+`.dropped_targets`, int64(len(v.Targets)))
}
}

Expand Down
47 changes: 47 additions & 0 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,53 @@ func TestAlterChangefeedTelemetry(t *testing.T) {
cdcTest(t, testFn, feedTestEnterpriseSinks)
}

func TestAlterChangefeedTelemetryTxnRetry(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1)`)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO bar VALUES (1)`)
sqlDB.Exec(t, `CREATE TABLE baz (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO baz VALUES (1)`)

// Reset the counts.
_ = telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo, bar WITH diff`)
defer closeFeed(t, testFeed)
feed := testFeed.(cdctest.EnterpriseTestFeed)
require.NoError(t, feed.Pause())

// Here we issue the alter stmt, rollback the txn, and
// issue it again.
alterStmt := fmt.Sprintf("ALTER CHANGEFEED %d DROP bar, foo ADD baz UNSET diff SET resolved, format=json", feed.JobID())
tx, err := s.DB.Begin()
require.NoError(t, err)
_, err = tx.Exec("SAVEPOINT a")
require.NoError(t, err)
_, err = tx.Exec(alterStmt)
require.NoError(t, err)
_, err = tx.Exec("ROLLBACK TO a")
require.NoError(t, err)
_, err = tx.Exec(alterStmt)
require.NoError(t, err)
require.NoError(t, tx.Commit())

counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
require.Equal(t, int32(1), counts[`changefeed.alter`])
require.Equal(t, int32(1), counts[`changefeed.alter.dropped_targets.2`])
require.Equal(t, int32(1), counts[`changefeed.alter.added_targets.1`])
require.Equal(t, int32(1), counts[`changefeed.alter.set_options.2`])
require.Equal(t, int32(1), counts[`changefeed.alter.unset_options.1`])
}

cdcTest(t, testFn, feedTestEnterpriseSinks)
}

// The purpose of this test is to ensure that the ALTER CHANGEFEED statement
// does not accidentally redact secret keys in the changefeed details
func TestAlterChangefeedPersistSinkURI(t *testing.T) {
Expand Down

0 comments on commit e1b2b67

Please sign in to comment.