Skip to content

Commit

Permalink
sql, *: allow table scopeing under pg_temp_<session_id> physical schema
Browse files Browse the repository at this point in the history
Previously, CRDB only supported the `public` physical schema. All table
entries in `system.namespace` assumed an implied `public` schema, so
name resolution for tables only required a databaseID and table name to
unqiely identify a table.

As highlighted in the temp tables RFC, temp tables will be scoped under
a session specific schema of the form `pg_temp_<session_id>`. This
motivated adding support for additional physical schemas.

This PR involves the following changes to `system.namespace`:

- A new `system.namespace` table is constructed for cluster versions
>= 20.1, which has an additional primary key column `publicSchemaID`.

- The older `system.namespace` table is marked deprecated. All
`system.namespace` accesses for cluster versions < 20.1 continue to
refer to this deprecated table. This is invisible to users.

- The new `system.namespace` table is moved out of the SystemConfig
range. This means `system.namespace` is no longer gossiped for cluster
versions >= 20.1 .

- Every new database creation adds the `public` schema to
`system.namespace` by default.

The `searchPath` is responsible for establishing the order in which
schemas are searched during name resolution. This PR involves the
following changes to the `searchPath`:

- The search semantics are updated to match those described in the temp
tables RFC.

- The search path is now aware of the session specific temporary schema,
which can be used during name resolution.

- The distSQL api proto had to be updated to pass the temporary schema to
other nodes in addition to the list of paths.

Follow-up work:

- The `TableCollection` cache needs to be updated to have knowledge about
schemaIDs. Once this is done, there is a TODO in the code that allows the
`CachedPhysicalAccessor` to work correctly.

- Migration for clusters upgrading to 20.1. The new `system.namespace`
table needs to be populated from the deprecated table and a `public`
schema needs to be added for every database during migration.

Release note (sql change): CREATE TABLE pg_temp.abc(a int) now creates
a temporary table. See temp tables RFC (guide-level explanation) for
more details about the search path semantics.
  • Loading branch information
arulajmani committed Nov 1, 2019
1 parent a576473 commit ec15991
Show file tree
Hide file tree
Showing 89 changed files with 1,624 additions and 726 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,6 @@
<tr><td><code>trace.debug.enable</code></td><td>boolean</td><td><code>false</code></td><td>if set, traces for recent requests can be seen in the /debug page</td></tr>
<tr><td><code>trace.lightstep.token</code></td><td>string</td><td><code></code></td><td>if set, traces go to Lightstep using this token</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>if set, traces go to the given Zipkin instance (example: '127.0.0.1:9411'); ignored if trace.lightstep.token is set</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>19.1-11</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>custom validation</td><td><code>20.1</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
6 changes: 4 additions & 2 deletions pkg/ccl/backupccl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -1370,10 +1370,12 @@ func (b *backupResumer) Resume(
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (b *backupResumer) OnFailOrCancel(context.Context, *client.Txn) error { return nil }
func (b *backupResumer) OnFailOrCancel(context.Context, *client.Txn, *cluster.Settings) error {
return nil
}

// OnSuccess is part of the jobs.Resumer interface.
func (b *backupResumer) OnSuccess(context.Context, *client.Txn) error { return nil }
func (b *backupResumer) OnSuccess(context.Context, *client.Txn, *cluster.Settings) error { return nil }

// OnTerminal is part of the jobs.Resumer interface.
func (b *backupResumer) OnTerminal(
Expand Down
28 changes: 18 additions & 10 deletions pkg/ccl/backupccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,7 @@ func allocateTableRewrites(
if err := p.ExecCfg().DB.Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
// Check that any DBs being restored do _not_ exist.
for name := range restoreDBNames {
dKey := sqlbase.NewDatabaseKey(name)
dKey := sqlbase.NewDatabaseKey(name, p.ExecCfg().Settings)
existingDatabaseID, err := txn.Get(ctx, dKey.Key())
if err != nil {
return err
Expand Down Expand Up @@ -397,7 +397,7 @@ func allocateTableRewrites(
} else {
var parentID sqlbase.ID
{
dKey := sqlbase.NewDatabaseKey(targetDB)
dKey := sqlbase.NewDatabaseKey(targetDB, p.ExecCfg().Settings)
existingDatabaseID, err := txn.Get(ctx, dKey.Key())
if err != nil {
return err
Expand All @@ -416,7 +416,7 @@ func allocateTableRewrites(

// Check that the table name is _not_ in use.
// This would fail the CPut later anyway, but this yields a prettier error.
if err := CheckTableExists(ctx, txn, parentID, table.Name); err != nil {
if err := CheckTableExists(ctx, txn, p.ExecCfg().Settings, parentID, table.Name); err != nil {
return err
}

Expand Down Expand Up @@ -486,9 +486,13 @@ func allocateTableRewrites(
// CheckTableExists returns an error if a table already exists with given
// parent and name.
func CheckTableExists(
ctx context.Context, txn *client.Txn, parentID sqlbase.ID, name string,
ctx context.Context,
txn *client.Txn,
settings *cluster.Settings,
parentID sqlbase.ID,
name string,
) error {
tKey := sqlbase.NewTableKey(parentID, name)
tKey := sqlbase.NewPublicTableKey(parentID, name, settings)
res, err := txn.Get(ctx, tKey.Key())
if err != nil {
return err
Expand Down Expand Up @@ -1057,7 +1061,7 @@ func WriteTableDescs(
if err := sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, desc.ID, desc); err != nil {
return err
}
b.CPut(sqlbase.NewDatabaseKey(desc.Name).Key(), desc.ID, nil)
b.CPut(sqlbase.NewDatabaseKey(desc.Name, settings).Key(), desc.ID, nil)
}
for i := range tables {
if wrote, ok := wroteDBs[tables[i].ParentID]; ok {
Expand All @@ -1079,7 +1083,7 @@ func WriteTableDescs(
if err := sql.WriteNewDescToBatch(ctx, false /* kvTrace */, settings, b, tables[i].ID, tables[i]); err != nil {
return err
}
b.CPut(sqlbase.NewTableKey(tables[i].ParentID, tables[i].Name).Key(), tables[i].ID, nil)
b.CPut(sqlbase.NewPublicTableKey(tables[i].ParentID, tables[i].Name, settings).Key(), tables[i].ID, nil)
}
for _, kv := range extra {
b.InitPut(kv.Key, &kv.Value, false)
Expand Down Expand Up @@ -1770,7 +1774,9 @@ func (r *restoreResumer) Resume(
// has been committed from a restore that has failed or been canceled. It does
// this by adding the table descriptors in DROP state, which causes the schema
// change stuff to delete the keys in the background.
func (r *restoreResumer) OnFailOrCancel(ctx context.Context, txn *client.Txn) error {
func (r *restoreResumer) OnFailOrCancel(
ctx context.Context, txn *client.Txn, settings *cluster.Settings,
) error {
details := r.job.Details().(jobspb.RestoreDetails)

// No need to mark the tables as dropped if they were not even created in the
Expand All @@ -1791,7 +1797,7 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, txn *client.Txn) er
var existingIDVal roachpb.Value
existingIDVal.SetInt(int64(tableDesc.ID))
b.CPut(
sqlbase.NewTableKey(tableDesc.ParentID, tableDesc.Name).Key(),
sqlbase.NewPublicTableKey(tableDesc.ParentID, tableDesc.Name, settings).Key(),
nil,
&existingIDVal,
)
Expand All @@ -1813,7 +1819,9 @@ func (r *restoreResumer) OnFailOrCancel(ctx context.Context, txn *client.Txn) er
}

// OnSuccess is part of the jobs.Resumer interface.
func (r *restoreResumer) OnSuccess(ctx context.Context, txn *client.Txn) error {
func (r *restoreResumer) OnSuccess(
ctx context.Context, txn *client.Txn, _ *cluster.Settings,
) error {
log.Event(ctx, "making tables live")

if err := stats.InsertNewStats(ctx, r.exec, txn, r.latestStats); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/targets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func TestDescriptorsMatchingTargets(t *testing.T) {
// {"", `TABLE system."FOO"`, []string{"system"}},
// {"system", `TABLE "FOO"`, []string{"system"}},
}
searchPath := sessiondata.MakeSearchPath([]string{"public", "pg_catalog"})
searchPath := sessiondata.MakeSearchPath([]string{"public", "pg_catalog"}, sessiondata.DefaultTemporarySchema)
for i, test := range tests {
t.Run(fmt.Sprintf("%d/%s/%s", i, test.sessionDatabase, test.pattern), func(t *testing.T) {
sql := fmt.Sprintf(`GRANT ALL ON %s TO ignored`, test.pattern)
Expand Down
8 changes: 6 additions & 2 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,10 +552,14 @@ func (b *changefeedResumer) Resume(
}

// OnFailOrCancel is part of the jobs.Resumer interface.
func (b *changefeedResumer) OnFailOrCancel(context.Context, *client.Txn) error { return nil }
func (b *changefeedResumer) OnFailOrCancel(context.Context, *client.Txn, *cluster.Settings) error {
return nil
}

// OnSuccess is part of the jobs.Resumer interface.
func (b *changefeedResumer) OnSuccess(context.Context, *client.Txn) error { return nil }
func (b *changefeedResumer) OnSuccess(context.Context, *client.Txn, *cluster.Settings) error {
return nil
}

// OnTerminal is part of the jobs.Resumer interface.
func (b *changefeedResumer) OnTerminal(context.Context, jobs.Status, chan<- tree.Datums) {}
10 changes: 6 additions & 4 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,7 +614,7 @@ func prepareNewTableDescsForIngestion(
) ([]*sqlbase.TableDescriptor, error) {
var tableDescs []*sqlbase.TableDescriptor
for _, i := range tables {
if err := backupccl.CheckTableExists(ctx, txn, parentID, i.Desc.Name); err != nil {
if err := backupccl.CheckTableExists(ctx, txn, p.ExecCfg().Settings, parentID, i.Desc.Name); err != nil {
return nil, err
}
tableDescs = append(tableDescs, i.Desc)
Expand Down Expand Up @@ -865,7 +865,9 @@ func (r *importResumer) Resume(
// been committed from a import that has failed or been canceled. It does this
// by adding the table descriptors in DROP state, which causes the schema change
// stuff to delete the keys in the background.
func (r *importResumer) OnFailOrCancel(ctx context.Context, txn *client.Txn) error {
func (r *importResumer) OnFailOrCancel(
ctx context.Context, txn *client.Txn, settings *cluster.Settings,
) error {
details := r.job.Details().(jobspb.ImportDetails)

// Needed to trigger the schema change manager.
Expand Down Expand Up @@ -921,7 +923,7 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, txn *client.Txn) err
tableDesc.DropTime = 1
var existingIDVal roachpb.Value
existingIDVal.SetInt(int64(tableDesc.ID))
tKey := sqlbase.NewTableKey(tableDesc.ParentID, tableDesc.Name)
tKey := sqlbase.NewPublicTableKey(tableDesc.ParentID, tableDesc.Name, settings)
b.CPut(tKey.Key(), nil, &existingIDVal)
} else {
// IMPORT did not create this table, so we should not drop it.
Expand All @@ -944,7 +946,7 @@ func (r *importResumer) OnFailOrCancel(ctx context.Context, txn *client.Txn) err
}

// OnSuccess is part of the jobs.Resumer interface.
func (r *importResumer) OnSuccess(ctx context.Context, txn *client.Txn) error {
func (r *importResumer) OnSuccess(ctx context.Context, txn *client.Txn, _ *cluster.Settings) error {
log.Event(ctx, "making tables live")
details := r.job.Details().(jobspb.ImportDetails)

Expand Down
6 changes: 5 additions & 1 deletion pkg/ccl/partitionccl/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1095,10 +1095,14 @@ func verifyScansOnNode(
}
traceLines = append(traceLines, traceLine.String)
if strings.Contains(traceLine.String, "read completed") {
if strings.Contains(traceLine.String, "SystemCon") {
if strings.Contains(traceLine.String, "SystemCon") || strings.Contains(traceLine.String, "NamespaceTab") {
// Ignore trace lines for the system config range (abbreviated as
// "SystemCon" in pretty printing of the range descriptor). A read might
// be performed to the system config range to update the table lease.
//
// Also ignore trace lines for the system.namespace table, which is a
// system table that resides outside the system config range. (abbreviated
// as "NamespaceTab" in pretty printing of the range descriptor).
continue
}
if !strings.Contains(traceLine.String, node) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2215,6 +2215,8 @@ writing ` + os.DevNull + `
debug/nodes/1/ranges/22.json
debug/nodes/1/ranges/23.json
debug/nodes/1/ranges/24.json
debug/nodes/1/ranges/25.json
debug/nodes/1/ranges/26.json
debug/schema/[email protected]
debug/schema/[email protected]
debug/schema/[email protected]
Expand All @@ -2225,6 +2227,7 @@ writing ` + os.DevNull + `
debug/schema/system/lease.json
debug/schema/system/locations.json
debug/schema/system/namespace.json
debug/schema/system/namespace_deprecated.json
debug/schema/system/rangelog.json
debug/schema/system/replication_constraint_stats.json
debug/schema/system/replication_critical_localities.json
Expand Down
20 changes: 11 additions & 9 deletions pkg/cli/sql_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ SET

expectedRows := [][]string{
{`parentID`, `INT8`, `false`, `NULL`, ``, `{primary}`, `false`},
{`parentSchemaID`, `INT8`, `false`, `NULL`, ``, `{primary}`, `false`},
{`name`, `STRING`, `false`, `NULL`, ``, `{primary}`, `false`},
{`id`, `INT8`, `true`, `NULL`, ``, `{}`, `false`},
}
Expand All @@ -163,12 +164,13 @@ SET
}

expected = `
column_name | data_type | is_nullable | column_default | generation_expression | indices | is_hidden
+-------------+-----------+-------------+----------------+-----------------------+-----------+-----------+
parentID | INT8 | false | NULL | | {primary} | false
name | STRING | false | NULL | | {primary} | false
id | INT8 | true | NULL | | {} | false
(3 rows)
column_name | data_type | is_nullable | column_default | generation_expression | indices | is_hidden
+----------------+-----------+-------------+----------------+-----------------------+-----------+-----------+
parentID | INT8 | false | NULL | | {primary} | false
parentSchemaID | INT8 | false | NULL | | {primary} | false
name | STRING | false | NULL | | {primary} | false
id | INT8 | true | NULL | | {} | false
(4 rows)
`

if a, e := b.String(), expected[1:]; a != e {
Expand All @@ -183,9 +185,9 @@ SET
}

expected = `
parentID | name | id
+----------+------------+----+
1 | descriptor | 3
parentID | parentSchemaID | name | id
+----------+----------------+------------+----+
1 | 29 | descriptor | 3
(1 row)
`
if a, e := b.String(), expected[1:]; a != e {
Expand Down
6 changes: 4 additions & 2 deletions pkg/config/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,9 @@ func TestComputeSplitKeyTableIDs(t *testing.T) {
kvs, _ /* splits */ := schema.GetInitialValues()
userSQL := append(kvs, descriptor(start), descriptor(start+1), descriptor(start+5))
// Real system tables and partitioned user tables.
subzoneSQL := append(userSQL,
var userSQLCopy = make([]roachpb.KeyValue, len(userSQL))
copy(userSQLCopy, userSQL)
subzoneSQL := append(userSQLCopy,
zoneConfig(start+1, subzone("a", ""), subzone("c", "e")),
zoneConfig(start+5, subzone("b", ""), subzone("c", "d"), subzone("d", "")))

Expand Down Expand Up @@ -404,13 +406,13 @@ func TestGetZoneConfigForKey(t *testing.T) {
{roachpb.RKey(keys.SystemConfigSplitKey), keys.SystemDatabaseID},

// Gossiped system tables should refer to the SystemDatabaseID.
{tkey(keys.NamespaceTableID), keys.SystemDatabaseID},
{tkey(keys.ZonesTableID), keys.SystemDatabaseID},

// Non-gossiped system tables should refer to themselves.
{tkey(keys.LeaseTableID), keys.LeaseTableID},
{tkey(keys.JobsTableID), keys.JobsTableID},
{tkey(keys.LocationsTableID), keys.LocationsTableID},
{tkey(keys.NamespaceTableID), keys.NamespaceTableID},

// Pseudo-tables should refer to the SystemDatabaseID.
{tkey(keys.MetaRangesID), keys.SystemDatabaseID},
Expand Down
7 changes: 4 additions & 3 deletions pkg/jobs/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
)

Expand All @@ -41,21 +42,21 @@ func (d FakeResumer) Resume(_ context.Context, _ interface{}, _ chan<- tree.Datu
return nil
}

func (d FakeResumer) OnFailOrCancel(_ context.Context, _ *client.Txn) error {
func (d FakeResumer) OnFailOrCancel(_ context.Context, _ *client.Txn, _ *cluster.Settings) error {
if d.Fail != nil {
return d.Fail()
}
return nil
}

func (d FakeResumer) OnSuccess(_ context.Context, _ *client.Txn) error {
func (d FakeResumer) OnSuccess(context.Context, *client.Txn, *cluster.Settings) error {
if d.Success != nil {
return d.Success()
}
return nil
}

func (d FakeResumer) OnTerminal(_ context.Context, _ Status, _ chan<- tree.Datums) {
func (d FakeResumer) OnTerminal(context.Context, Status, chan<- tree.Datums) {
if d.Terminal != nil {
d.Terminal()
}
Expand Down
19 changes: 12 additions & 7 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -314,7 +315,9 @@ func (j *Job) resumed(ctx context.Context) error {
// Canceled sets the status of the tracked job to canceled. It does not directly
// cancel the job; like job.Paused, it expects the job to call job.Progressed
// soon, observe a "job is canceled" error, and abort further work.
func (j *Job) canceled(ctx context.Context, fn func(context.Context, *client.Txn) error) error {
func (j *Job) canceled(
ctx context.Context, fn func(context.Context, *client.Txn, *cluster.Settings) error,
) error {
return j.Update(ctx, func(txn *client.Txn, md JobMetadata, ju *JobUpdater) error {
if md.Status == StatusCanceled {
// Already canceled - do nothing.
Expand All @@ -327,7 +330,7 @@ func (j *Job) canceled(ctx context.Context, fn func(context.Context, *client.Txn
return fmt.Errorf("job with status %s cannot be canceled", md.Status)
}
if fn != nil {
if err := fn(ctx, txn); err != nil {
if err := fn(ctx, txn, j.registry.settings); err != nil {
return err
}
}
Expand All @@ -340,18 +343,18 @@ func (j *Job) canceled(ctx context.Context, fn func(context.Context, *client.Txn

// NoopFn is an empty function that can be used for Failed and Succeeded. It indicates
// no transactional callback should be made during these operations.
var NoopFn = func(context.Context, *client.Txn) error { return nil }
var NoopFn = func(context.Context, *client.Txn, *cluster.Settings) error { return nil }

// Failed marks the tracked job as having failed with the given error.
func (j *Job) Failed(
ctx context.Context, err error, fn func(context.Context, *client.Txn) error,
ctx context.Context, err error, fn func(context.Context, *client.Txn, *cluster.Settings) error,
) error {
return j.Update(ctx, func(txn *client.Txn, md JobMetadata, ju *JobUpdater) error {
if md.Status.Terminal() {
// Already done - do nothing.
return nil
}
if err := fn(ctx, txn); err != nil {
if err := fn(ctx, txn, j.registry.settings); err != nil {
return err
}
ju.UpdateStatus(StatusFailed)
Expand All @@ -364,13 +367,15 @@ func (j *Job) Failed(

// Succeeded marks the tracked job as having succeeded and sets its fraction
// completed to 1.0.
func (j *Job) Succeeded(ctx context.Context, fn func(context.Context, *client.Txn) error) error {
func (j *Job) Succeeded(
ctx context.Context, fn func(context.Context, *client.Txn, *cluster.Settings) error,
) error {
return j.Update(ctx, func(txn *client.Txn, md JobMetadata, ju *JobUpdater) error {
if md.Status.Terminal() {
// Already done - do nothing.
return nil
}
if err := fn(ctx, txn); err != nil {
if err := fn(ctx, txn, j.registry.settings); err != nil {
return err
}
ju.UpdateStatus(StatusSucceeded)
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ type Resumer interface {
//
// Any work this function does must still be correct if the txn is aborted at
// a later time.
OnSuccess(ctx context.Context, txn *client.Txn) error
OnSuccess(ctx context.Context, txn *client.Txn, settings *cluster.Settings) error

// OnTerminal is called after a job has successfully been marked as
// terminal. It should be used to perform optional cleanup and return final
Expand All @@ -533,7 +533,7 @@ type Resumer interface {
// This method can be called during cancellation, which is not guaranteed to
// run on the node where the job is running. So it cannot assume that any
// other methods have been called on this Resumer object.
OnFailOrCancel(ctx context.Context, txn *client.Txn) error
OnFailOrCancel(ctx context.Context, txn *client.Txn, settings *cluster.Settings) error
}

// Constructor creates a resumable job of a certain type. The Resumer is
Expand Down
Loading

0 comments on commit ec15991

Please sign in to comment.