Skip to content

Commit

Permalink
Merge #82634 #83592
Browse files Browse the repository at this point in the history
82634: cdc: updated the initial scan syntax for ALTER changefeeds r=surajr10 a=surajr10

updated the initial scan syntax for ALTER changefeeds to handle initial_scan, no_initial_scan, initial_scan = 'yes|no|only' options and added tests for this

Release note: None

83592: sql: refactor MetadataUpdater to properly share txns/collections r=fqazi a=fqazi

Previously, the code that we added for the metadata updater, which
is used to update descriptor metadata had code paths that could
end up creating new transactions/collections. This was problematic because
those updates were not executing under the same user transaction.
This patch simplifies the metadata updater logic to no longer
have a factory and share collections/transactions properly.

Release note: None

Co-authored-by: Suraj <[email protected]>
Co-authored-by: Faizan Qazi <[email protected]>
  • Loading branch information
3 people committed Jun 30, 2022
3 parents e461d15 + 5b1d968 + 0d015a6 commit e1e99da
Show file tree
Hide file tree
Showing 28 changed files with 254 additions and 253 deletions.
50 changes: 47 additions & 3 deletions pkg/ccl/changefeedccl/alter_changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,12 +310,20 @@ func generateNewTargets(
// name of the target was modified.
originalSpecs := make(map[tree.ChangefeedTarget]jobspb.ChangefeedTargetSpecification)

// We want to store the value of whether or not the original changefeed had
// initial_scan set to only so that we only do an initial scan on an alter
// changefeed with initial_scan = 'only' if the original one also had
// initial_scan = 'only'.
_, originalInitialScanOnlyOption := opts[changefeedbase.OptInitialScanOnly]

// When we add new targets with or without initial scans, indicating
// initial_scan or no_initial_scan in the job description would lose its
// meaning. Hence, we will omit these details from the changefeed
// description. However, to ensure that we do perform the initial scan on
// newly added targets, we will introduce the initial_scan opt after the
// job record is created.

delete(opts, changefeedbase.OptInitialScanOnly)
delete(opts, changefeedbase.OptNoInitialScan)
delete(opts, changefeedbase.OptInitialScan)

Expand Down Expand Up @@ -384,16 +392,52 @@ func generateNewTargets(
return nil, nil, hlc.Timestamp{}, nil, err
}

_, withInitialScan := targetOpts[changefeedbase.OptInitialScan]
_, noInitialScan := targetOpts[changefeedbase.OptNoInitialScan]
if withInitialScan && noInitialScan {
var withInitialScan bool
initialScanType, initialScanSet := targetOpts[changefeedbase.OptInitialScan]
_, noInitialScanSet := targetOpts[changefeedbase.OptNoInitialScan]
_, initialScanOnlySet := targetOpts[changefeedbase.OptInitialScanOnly]

if initialScanSet {
if initialScanType == `no` || (initialScanType == `only` && !originalInitialScanOnlyOption) {
withInitialScan = false
} else {
withInitialScan = true
}
} else {
withInitialScan = false
}

if initialScanType != `` && initialScanType != `yes` && initialScanType != `no` && initialScanType != `only` {
return nil, nil, hlc.Timestamp{}, nil, pgerror.Newf(
pgcode.InvalidParameterValue,
`cannot set initial_scan to %q. possible values for initial_scan are "yes", "no", "only", or no value`, changefeedbase.OptInitialScan,
)
}

if initialScanSet && noInitialScanSet {
return nil, nil, hlc.Timestamp{}, nil, pgerror.Newf(
pgcode.InvalidParameterValue,
`cannot specify both %q and %q`, changefeedbase.OptInitialScan,
changefeedbase.OptNoInitialScan,
)
}

if initialScanSet && initialScanOnlySet {
return nil, nil, hlc.Timestamp{}, nil, pgerror.Newf(
pgcode.InvalidParameterValue,
`cannot specify both %q and %q`, changefeedbase.OptInitialScan,
changefeedbase.OptInitialScanOnly,
)
}

if noInitialScanSet && initialScanOnlySet {
return nil, nil, hlc.Timestamp{}, nil, pgerror.Newf(
pgcode.InvalidParameterValue,
`cannot specify both %q and %q`, changefeedbase.OptInitialScanOnly,
changefeedbase.OptNoInitialScan,
)
}

var existingTargetDescs []catalog.Descriptor
for _, targetDesc := range newTableDescs {
existingTargetDescs = append(existingTargetDescs, targetDesc)
Expand Down
138 changes: 53 additions & 85 deletions pkg/ccl/changefeedccl/alter_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -894,91 +894,6 @@ func TestAlterChangefeedAlterTableName(t *testing.T) {
cdcTest(t, testFn, feedTestEnterpriseSinks)
}

func TestAlterChangefeedInitialScan(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1), (2), (3)`)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO bar VALUES (1), (2), (3)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '1s', no_initial_scan`)
defer closeFeed(t, testFeed)

expectResolvedTimestamp(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
require.True(t, ok)

sqlDB.Exec(t, `PAUSE JOB $1`, feed.JobID())
waitForJobStatus(sqlDB, t, feed.JobID(), `paused`)

sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d ADD bar WITH initial_scan`, feed.JobID()))

sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, feed.JobID()))
waitForJobStatus(sqlDB, t, feed.JobID(), `running`)

assertPayloads(t, testFeed, []string{
`bar: [1]->{"after": {"a": 1}}`,
`bar: [2]->{"after": {"a": 2}}`,
`bar: [3]->{"after": {"a": 3}}`,
})

sqlDB.Exec(t, `INSERT INTO bar VALUES (4)`)
assertPayloads(t, testFeed, []string{
`bar: [4]->{"after": {"a": 4}}`,
})
}

cdcTest(t, testFn, feedTestEnterpriseSinks)
}

func TestAlterChangefeedNoInitialScan(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1), (2), (3)`)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO bar VALUES (1), (2), (3)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '1s'`)
defer closeFeed(t, testFeed)

assertPayloads(t, testFeed, []string{
`foo: [1]->{"after": {"a": 1}}`,
`foo: [2]->{"after": {"a": 2}}`,
`foo: [3]->{"after": {"a": 3}}`,
})
expectResolvedTimestamp(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
require.True(t, ok)

sqlDB.Exec(t, `PAUSE JOB $1`, feed.JobID())
waitForJobStatus(sqlDB, t, feed.JobID(), `paused`)

sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d ADD bar WITH no_initial_scan`, feed.JobID()))

sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, feed.JobID()))
waitForJobStatus(sqlDB, t, feed.JobID(), `running`)

expectResolvedTimestamp(t, testFeed)

sqlDB.Exec(t, `INSERT INTO bar VALUES (4)`)
assertPayloads(t, testFeed, []string{
`bar: [4]->{"after": {"a": 4}}`,
})
}

cdcTest(t, testFn, feedTestEnterpriseSinks)
}

func TestAlterChangefeedAddTargetsDuringSchemaChangeError(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down Expand Up @@ -1354,3 +1269,56 @@ func TestAlterChangefeedUpdateFilter(t *testing.T) {

cdcTest(t, testFn, feedTestEnterpriseSinks)
}

func TestAlterChangefeedInitialScan(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(initialScanOption string) cdcTestFn {
return func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO foo VALUES (1), (2), (3)`)
sqlDB.Exec(t, `CREATE TABLE bar (a INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO bar VALUES (1), (2), (3)`)

testFeed := feed(t, f, `CREATE CHANGEFEED FOR foo WITH resolved = '1s', no_initial_scan`)
defer closeFeed(t, testFeed)

expectResolvedTimestamp(t, testFeed)

feed, ok := testFeed.(cdctest.EnterpriseTestFeed)
require.True(t, ok)

sqlDB.Exec(t, `PAUSE JOB $1`, feed.JobID())
waitForJobStatus(sqlDB, t, feed.JobID(), `paused`)

sqlDB.Exec(t, fmt.Sprintf(`ALTER CHANGEFEED %d ADD bar WITH %s`, feed.JobID(), initialScanOption))

sqlDB.Exec(t, fmt.Sprintf(`RESUME JOB %d`, feed.JobID()))
waitForJobStatus(sqlDB, t, feed.JobID(), `running`)

expectPayloads := (initialScanOption == "initial_scan = 'yes'" || initialScanOption == "initial_scan")
if expectPayloads {
assertPayloads(t, testFeed, []string{
`bar: [1]->{"after": {"a": 1}}`,
`bar: [2]->{"after": {"a": 2}}`,
`bar: [3]->{"after": {"a": 3}}`,
})
}

sqlDB.Exec(t, `INSERT INTO bar VALUES (4)`)
assertPayloads(t, testFeed, []string{
`bar: [4]->{"after": {"a": 4}}`,
})
}
}

for _, initialScanOpt := range []string{
"initial_scan = 'yes'",
"initial_scan = 'no'",
"initial_scan",
"no_initial_scan"} {
cdcTest(t, testFn(initialScanOpt), feedTestForceSink("kafka"))
}
}
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeedbase/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ var AlterChangefeedOptionExpectValues = func() map[string]OptionPermittedValues
// AlterChangefeedTargetOptions is used to parse target specific alter
// changefeed options using PlanHookState.TypeAsStringOpts().
var AlterChangefeedTargetOptions = map[string]OptionPermittedValues{
OptInitialScan: flagOption,
OptInitialScan: enum("yes", "no", "only").orEmptyMeans("yes"),
OptNoInitialScan: flagOption,
}

Expand Down
1 change: 0 additions & 1 deletion pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ go_library(
"//pkg/sql/consistencychecker",
"//pkg/sql/contention",
"//pkg/sql/contentionpb",
"//pkg/sql/descmetadata",
"//pkg/sql/distsql",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
Expand Down
7 changes: 0 additions & 7 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/colexec"
"github.com/cockroachdb/cockroach/pkg/sql/consistencychecker"
"github.com/cockroachdb/cockroach/pkg/sql/contention"
"github.com/cockroachdb/cockroach/pkg/sql/descmetadata"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
Expand Down Expand Up @@ -932,12 +931,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
sql.ValidateInvertedIndexes,
sql.NewFakeSessionData,
)

execCfg.DescMetadaUpdaterFactory = descmetadata.NewMetadataUpdaterFactory(
ieFactory,
collectionFactory,
&execCfg.Settings.SV,
)
execCfg.InternalExecutorFactory = ieFactory

distSQLServer.ServerConfig.ProtectedTimestampProvider = execCfg.ProtectedTimestampProvider
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ go_library(
"//pkg/sql/covering",
"//pkg/sql/decodeusername",
"//pkg/sql/delegate",
"//pkg/sql/descmetadata",
"//pkg/sql/distsql",
"//pkg/sql/enum",
"//pkg/sql/evalcatalog",
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/comment_on_constraint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/descmetadata"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
Expand Down Expand Up @@ -57,8 +58,11 @@ func (p *planner) CommentOnConstraint(
return &commentOnConstraintNode{
n: n,
tableDesc: tableDesc,
metadataUpdater: p.execCfg.DescMetadaUpdaterFactory.NewMetadataUpdater(
metadataUpdater: descmetadata.NewMetadataUpdater(
ctx,
p.ExecCfg().InternalExecutorFactory,
p.Descriptors(),
&p.ExecCfg().Settings.SV,
p.txn,
p.SessionData(),
),
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/comment_on_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/descmetadata"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -52,8 +53,11 @@ func (p *planner) CommentOnDatabase(

return &commentOnDatabaseNode{n: n,
dbDesc: dbDesc,
metadataUpdater: p.execCfg.DescMetadaUpdaterFactory.NewMetadataUpdater(
metadataUpdater: descmetadata.NewMetadataUpdater(
ctx,
p.ExecCfg().InternalExecutorFactory,
p.Descriptors(),
&p.ExecCfg().Settings.SV,
p.txn,
p.SessionData(),
),
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/comment_on_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/descmetadata"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -49,8 +50,11 @@ func (p *planner) CommentOnIndex(ctx context.Context, n *tree.CommentOnIndex) (p
n: n,
tableDesc: tableDesc,
index: index,
metadataUpdater: p.execCfg.DescMetadaUpdaterFactory.NewMetadataUpdater(
metadataUpdater: descmetadata.NewMetadataUpdater(
ctx,
p.ExecCfg().InternalExecutorFactory,
p.Descriptors(),
&p.ExecCfg().Settings.SV,
p.txn,
p.SessionData(),
)}, nil
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/comment_on_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/descmetadata"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
Expand Down Expand Up @@ -71,8 +72,11 @@ func (p *planner) CommentOnSchema(ctx context.Context, n *tree.CommentOnSchema)
return &commentOnSchemaNode{
n: n,
schemaDesc: schemaDesc,
metadataUpdater: p.execCfg.DescMetadaUpdaterFactory.NewMetadataUpdater(
metadataUpdater: descmetadata.NewMetadataUpdater(
ctx,
p.ExecCfg().InternalExecutorFactory,
p.Descriptors(),
&p.ExecCfg().Settings.SV,
p.txn,
p.SessionData(),
),
Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/comment_on_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/descmetadata"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scexec"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -52,8 +53,11 @@ func (p *planner) CommentOnTable(ctx context.Context, n *tree.CommentOnTable) (p
return &commentOnTableNode{
n: n,
tableDesc: tableDesc,
metadataUpdater: p.execCfg.DescMetadaUpdaterFactory.NewMetadataUpdater(
metadataUpdater: descmetadata.NewMetadataUpdater(
ctx,
p.ExecCfg().InternalExecutorFactory,
p.Descriptors(),
&p.ExecCfg().Settings.SV,
p.txn,
p.SessionData(),
),
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/descmetadata/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ go_library(
srcs = [
"comment_cache.go",
"metadata_updater.go",
"metadata_updater_factory.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/descmetadata",
visibility = ["//visibility:public"],
Expand Down
Loading

0 comments on commit e1e99da

Please sign in to comment.