Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
73537: sql: insert missing public schema entry migration r=ajwerner a=RichardJCai

When restoring a database, a namespace entry for the public
schema was not created.

Release note: None

73614: kvserver: alter balanceScore result to classify stores into 3 buckets r=aayushshah15 a=aayushshah15

In #65379 we changed
`balanceScore()` to classify stores in a cluster into 4 buckets:
underfull, less-than-mean, more-than-mean and overfull. This
allowed for the possibility of thrashing around the mean (i.e. replicas
could ping-pong between stores classified as less-than-mean and
more-than-mean).

This patch moves balanceScore back to its previous incarnation, which
only divided resulting stores into 3 buckets.

Release justification: Fixes regression introduced in a previous patch

Release note: None

73754: keys,*: adopt SystemIDChecker r=postamar a=postamar

This commit removes many references to `keys` package constants 49, 50
and 54, and replaces them with functions that take a SystemIDChecker.
Existing functionality should remain unchanged.

This is a prerequisite to making the system descriptor ID space dynamic.

Release note: None

73767: backupccl: remove stitching queue file count ceiling r=dt a=adityamaru

This change removes the maxQueueSize that limited the number
of files that could be buffered in the queue when merging
SSTs during a backup. The efficacy and need of this cap
is not evident, and we already have a byte limit on how
large the queue can grow. This reduces the number of
variables that need to be tuned to achieve more optimal file
merging behaviour.

Informs: #73815

Release note: None

Co-authored-by: richardjcai <[email protected]>
Co-authored-by: Aayush Shah <[email protected]>
Co-authored-by: Marius Posta <[email protected]>
Co-authored-by: Aditya Maru <[email protected]>
  • Loading branch information
5 people committed Dec 17, 2021
5 parents c0fe9cf + 93be89d + e5d3d89 + 90e3fd3 + 6716f75 commit 9edb27f
Show file tree
Hide file tree
Showing 93 changed files with 748 additions and 406 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -169,4 +169,4 @@ trace.debug.enable boolean false if set, traces for recent requests can be seen
trace.jaeger.agent string the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-32 set the active cluster version in the format '<major>.<minor>'
version version 21.2-34 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,6 @@
<tr><td><code>trace.jaeger.agent</code></td><td>string</td><td><code></code></td><td>the address of a Jaeger agent to receive traces using the Jaeger UDP Thrift protocol, as <host>:<port>. If no port is specified, 6381 will be used.</td></tr>
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-32</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-34</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
2 changes: 2 additions & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ go_test(
"full_cluster_backup_restore_test.go",
"helpers_test.go",
"import_spans_test.go",
"insert_missing_public_schema_namespace_entry_restore_test.go",
"key_rewriter_test.go",
"main_test.go",
"partitioned_backup_test.go",
Expand Down Expand Up @@ -193,6 +194,7 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catalogkv",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
Expand Down
6 changes: 1 addition & 5 deletions pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,6 @@ var (
)
)

// maxSinkQueueFiles is how many replies we'll queue up before flushing to allow
// some re-ordering, unless we hit smallFileBuffer size first.
const maxSinkQueueFiles = 24

const backupProcessorName = "backupDataProcessor"

// TODO(pbardea): It would be nice if we could add some DistSQL processor tests
Expand Down Expand Up @@ -608,7 +604,7 @@ func (s *sstSink) push(ctx context.Context, resp returnedSST) error {
s.queue = append(s.queue, resp)
s.queueSize += len(resp.sst)

if len(s.queue) >= maxSinkQueueFiles || s.queueSize >= int(smallFileBuffer.Get(s.conf.settings)) {
if s.queueSize >= int(smallFileBuffer.Get(s.conf.settings)) {
sort.Slice(s.queue, func(i, j int) bool { return s.queue[i].f.Span.Key.Compare(s.queue[j].f.Span.Key) < 0 })

// Drain the first half.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4258,7 +4258,7 @@ func TestRestoreAsOfSystemTimeGCBounds(t *testing.T) {
gcr := roachpb.GCRequest{
// Bogus span to make it a valid request.
RequestHeader: roachpb.RequestHeader{
Key: keys.SystemSQLCodec.TablePrefix(keys.MinUserDescID),
Key: keys.SystemSQLCodec.TablePrefix(keys.TestingUserDescID(0)),
EndKey: keys.MaxKey,
},
Threshold: tc.Server(0).Clock().Now(),
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/full_cluster_backup_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ CREATE TABLE data2.foo (a int);

// Check there is no data in the span that we expect user data to be imported.
store := tcRestore.GetFirstStoreFromServer(t, 0)
startKey := keys.SystemSQLCodec.TablePrefix(keys.MinUserDescID)
startKey := keys.SystemSQLCodec.TablePrefix(keys.TestingUserDescID(0))
endKey := keys.SystemSQLCodec.TablePrefix(uint32(maxBackupTableID)).PrefixEnd()
it := store.Engine().NewMVCCIterator(storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
UpperBound: endKey,
Expand Down Expand Up @@ -429,7 +429,7 @@ func TestDisallowFullClusterRestoreOnNonFreshCluster(t *testing.T) {
sqlDB.Exec(t, `BACKUP TO $1`, LocalFoo)
sqlDBRestore.Exec(t, `CREATE DATABASE foo`)
sqlDBRestore.ExpectErr(t,
"pq: full cluster restore can only be run on a cluster with no tables or databases but found 2 descriptors: \\[foo public\\]",
"pq: full cluster restore can only be run on a cluster with no tables or databases but found 2 descriptors: foo, public",
`RESTORE FROM $1`, LocalFoo,
)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package backupccl_test

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

func TestInsertMissingPublicSchemaNamespaceEntry(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
dir, cleanup := testutils.TempDir(t)
defer cleanup()
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
ExternalIODir: dir,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: 1,
BinaryVersionOverride: clusterversion.ByKey(clusterversion.InsertPublicSchemaNamespaceEntryOnRestore - 1),
},
},
},
})
defer tc.Stopper().Stop(ctx)

db := tc.ServerConn(0)
defer db.Close()
sqlDB := sqlutils.MakeSQLRunner(tc.Conns[0])

// Mimic a restore where the public schema system.namespace entries are
// missing.
sqlDB.Exec(t, `CREATE DATABASE db1`)
sqlDB.Exec(t, `CREATE TABLE db1.t()`)
sqlDB.Exec(t, `CREATE SCHEMA db1.s`)
sqlDB.Exec(t, `CREATE DATABASE db2`)
sqlDB.Exec(t, `CREATE TABLE db2.t(x INT)`)
sqlDB.Exec(t, `INSERT INTO db2.t VALUES (1), (2)`)
sqlDB.Exec(t, `CREATE SCHEMA db2.s`)
sqlDB.Exec(t, `CREATE TABLE db2.s.t(x INT)`)
sqlDB.Exec(t, `INSERT INTO db2.s.t VALUES (1), (2)`)

var db1ID, db2ID descpb.ID
row := sqlDB.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'db1'`)
row.Scan(&db1ID)
row = sqlDB.QueryRow(t, `SELECT id FROM system.namespace WHERE name = 'db2'`)
row.Scan(&db2ID)

// Remove system.namespace entries for the public schema for the two
// databases.
err := tc.Servers[0].DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
codec := keys.SystemSQLCodec
b := txn.NewBatch()
b.Del(catalogkeys.MakeSchemaNameKey(codec, db1ID, `public`))
b.Del(catalogkeys.MakeSchemaNameKey(codec, db2ID, `public`))
return txn.Run(ctx, b)
})
require.NoError(t, err)

// Verify that there are no system.namespace entries for the public schema for
// the two databases.
sqlDB.CheckQueryResults(t, fmt.Sprintf(`SELECT id FROM system.namespace WHERE name = 'public' AND "parentID"=%d`, db1ID), [][]string{})
sqlDB.CheckQueryResults(t, fmt.Sprintf(`SELECT id FROM system.namespace WHERE name = 'public' AND "parentID"=%d`, db2ID), [][]string{})

// Kick off migration by upgrading to the new version.
_ = sqlDB.Exec(t, `SET CLUSTER SETTING version = $1`,
clusterversion.ByKey(clusterversion.InsertPublicSchemaNamespaceEntryOnRestore).String())

sqlDB.CheckQueryResults(t, fmt.Sprintf(`SELECT id FROM system.namespace WHERE name = 'public' AND "parentID"=%d`, db1ID), [][]string{{"29"}})
sqlDB.CheckQueryResults(t, fmt.Sprintf(`SELECT id FROM system.namespace WHERE name = 'public' AND "parentID"=%d`, db2ID), [][]string{{"29"}})

}
15 changes: 7 additions & 8 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -967,15 +967,14 @@ func isSchemaEmpty(
return true, nil
}

func getTempSystemDBID(details jobspb.RestoreDetails) descpb.ID {
tempSystemDBID := keys.MinNonPredefinedUserDescID
func getTempSystemDBID(details jobspb.RestoreDetails, idChecker keys.SystemIDChecker) descpb.ID {
tempSystemDBID := descpb.ID(catalogkeys.MinNonDefaultUserDescriptorID(idChecker))
for id := range details.DescriptorRewrites {
if int(id) > tempSystemDBID {
tempSystemDBID = int(id)
if id > tempSystemDBID {
tempSystemDBID = id
}
}

return descpb.ID(tempSystemDBID)
return tempSystemDBID
}

// spansForAllRestoreTableIndexes returns non-overlapping spans for every index
Expand Down Expand Up @@ -1106,7 +1105,7 @@ func createImportingDescriptors(

tempSystemDBID := descpb.InvalidID
if details.DescriptorCoverage == tree.AllDescriptors {
tempSystemDBID = getTempSystemDBID(details)
tempSystemDBID = getTempSystemDBID(details, p.ExecCfg().SystemIDChecker)
tempSystemDB := dbdesc.NewInitial(tempSystemDBID, restoreTempSystemDB,
security.AdminRoleName(), dbdesc.WithPublicSchemaID(keys.SystemPublicSchemaID))
databases = append(databases, tempSystemDB)
Expand Down Expand Up @@ -2667,7 +2666,7 @@ func (r *restoreResumer) restoreSystemTables(
restoreDetails jobspb.RestoreDetails,
tables []catalog.TableDescriptor,
) error {
tempSystemDBID := getTempSystemDBID(restoreDetails)
tempSystemDBID := getTempSystemDBID(restoreDetails, r.execCfg.SystemIDChecker)
details := r.job.Details().(jobspb.RestoreDetails)
if details.SystemTablesMigrated == nil {
details.SystemTablesMigrated = make(map[string]bool)
Expand Down
57 changes: 21 additions & 36 deletions pkg/ccl/backupccl/restore_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func allocateDescriptorRewrites(

// Fail fast if the tables to restore are incompatible with the specified
// options.
maxDescIDInBackup := int64(keys.MinNonPredefinedUserDescID)
maxDescIDInBackup := int64(catalogkeys.MinNonDefaultUserDescriptorID(p.ExecCfg().SystemIDChecker))
for _, table := range tablesByID {
if int64(table.ID) > maxDescIDInBackup {
maxDescIDInBackup = int64(table.ID)
Expand Down Expand Up @@ -950,7 +950,7 @@ func resolveTargetDB(
return intoDB, nil
}

if descriptorCoverage == tree.AllDescriptors && descriptor.GetParentID() < keys.MaxReservedDescID {
if descriptorCoverage == tree.AllDescriptors && catalog.IsSystemDescriptor(descriptor) {
var targetDB string
if descriptor.GetParentID() == systemschema.SystemDB.GetID() {
// For full cluster backups, put the system tables in the temporary
Expand Down Expand Up @@ -1367,24 +1367,6 @@ func errOnMissingRange(span covering.Range, start, end hlc.Timestamp) error {
)
}

func getUserDescriptorNames(
ctx context.Context, txn *kv.Txn, codec keys.SQLCodec,
) ([]string, error) {
allDescs, err := catalogkv.GetAllDescriptors(ctx, txn, codec, true /* shouldRunPostDeserializationChanges */)
if err != nil {
return nil, err
}

var allNames = make([]string, 0, len(allDescs))
for _, desc := range allDescs {
if !catalogkeys.IsDefaultCreatedDescriptor(desc.GetID()) {
allNames = append(allNames, desc.GetName())
}
}

return allNames, nil
}

// resolveOptionsForRestoreJobDescription creates a copy of
// the options specified during a restore, after processing
// them to be suitable for displaying in the jobs' description.
Expand Down Expand Up @@ -1827,24 +1809,27 @@ func doRestorePlan(
return errors.Errorf("full cluster RESTORE can only be used on full cluster BACKUP files")
}

// Ensure that no user table descriptors exist for a full cluster restore.
txn := p.ExecCfg().DB.NewTxn(ctx, "count-user-descs")
descCount, err := catalogkv.CountUserDescriptors(ctx, txn, p.ExecCfg().Codec)
if err != nil {
return errors.Wrap(err, "looking up user descriptors during restore")
}
if descCount != 0 && restoreStmt.DescriptorCoverage == tree.AllDescriptors {
var userDescriptorNames []string
userDescriptorNames, err := getUserDescriptorNames(ctx, txn, p.ExecCfg().Codec)
// Ensure that no user descriptors exist for a full cluster restore.
if restoreStmt.DescriptorCoverage == tree.AllDescriptors {
txn := p.ExecCfg().DB.NewTxn(ctx, "count-user-descs")
allUserDescs, err := catalogkv.GetAllUserCreatedDescriptors(ctx, txn, p.ExecCfg().Codec)
if err != nil {
// We're already returning an error, and we're just trying to make the
// error message more helpful. If we fail to do that, let's just log.
log.Errorf(ctx, "fetching user descriptor names: %+v", err)
return errors.Wrap(err, "looking up user descriptors during restore")
}
if len(allUserDescs) > 0 {
userDescriptorNames := make([]string, 0, 20)
for i, desc := range allUserDescs {
if i == 20 {
userDescriptorNames = append(userDescriptorNames, "...")
break
}
userDescriptorNames = append(userDescriptorNames, desc.GetName())
}
return errors.Errorf(
"full cluster restore can only be run on a cluster with no tables or databases but found %d descriptors: %s",
len(allUserDescs), strings.Join(userDescriptorNames, ", "),
)
}
return errors.Errorf(
"full cluster restore can only be run on a cluster with no tables or databases but found %d descriptors: %s",
descCount, userDescriptorNames,
)
}

// wasOffline tracks which tables were in an offline or adding state at some
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/show_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ ORDER BY object_type, object_name`, full)
// Create tables with the same ID as data.tableA to ensure that comments
// from different tables in the restoring cluster don't appear.
tableA := catalogkv.TestingGetTableDescriptor(kvDB, keys.SystemSQLCodec, "data", "tablea")
for i := keys.MinUserDescID; i < int(tableA.GetID()); i++ {
for i := keys.TestingUserDescID(0); i < uint32(tableA.GetID()); i++ {
tableName := fmt.Sprintf("foo%d", i)
sqlDBRestore.Exec(t, fmt.Sprintf("CREATE TABLE %s ();", tableName))
sqlDBRestore.Exec(t, fmt.Sprintf("COMMENT ON TABLE %s IS 'table comment'", tableName))
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ func parseTableDesc(createTableStmt string) (catalog.TableDescriptor, error) {
return nil, errors.Errorf("expected *tree.CreateTable got %T", stmt)
}
st := cluster.MakeTestingClusterSettings()
const parentID = descpb.ID(keys.MaxReservedDescID + 1)
const tableID = descpb.ID(keys.MaxReservedDescID + 2)
parentID := descpb.ID(keys.TestingUserDescID(0))
tableID := descpb.ID(keys.TestingUserDescID(1))
semaCtx := makeTestSemaCtx()
mutDesc, err := importccl.MakeTestingSimpleTableDescriptor(
ctx, &semaCtx, st, createTable, parentID, keys.PublicSchemaID, tableID, importccl.NoFKs, hlc.UnixNano())
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3466,7 +3466,7 @@ func TestChangefeedProtectedTimestamps(t *testing.T) {
var (
ctx = context.Background()
userSpan = roachpb.Span{
Key: keys.UserTableDataMin,
Key: keys.TestingUserTableDataMin(),
EndKey: keys.TableDataMax,
}
done = make(chan struct{})
Expand Down
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ go_library(
deps = [
"//pkg/ccl/utilccl",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/settings",
"//pkg/sql",
"//pkg/sql/catalog",
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/changefeedbase/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package changefeedbase

import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/errors"
)
Expand All @@ -27,7 +26,7 @@ func ValidateTable(targets jobspb.ChangefeedTargets, tableDesc catalog.TableDesc
// saved in it), but there are subtle differences in the way many of them
// work and this will be under-tested, so disallow them all until demand
// dictates.
if tableDesc.GetID() < keys.MinUserDescID {
if catalog.IsSystemDescriptor(tableDesc) {
return errors.Errorf(`CHANGEFEEDs are not supported on system tables`)
}
if tableDesc.IsView() {
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/importccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catalogkeys",
"//pkg/sql/catalog/catalogkv",
"//pkg/sql/catalog/catformat",
"//pkg/sql/catalog/dbdesc",
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/importccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func BenchmarkImportWorkload(b *testing.B) {
ts := timeutil.Now()
var tableSSTs []tableSSTable
for i, table := range g.Tables() {
tableID := descpb.ID(keys.MinUserDescID + 1 + i)
tableID := descpb.ID(keys.TestingUserDescID(1 + uint32(i)))
sst, err := format.ToSSTable(table, tableID, ts)
require.NoError(b, err)

Expand Down Expand Up @@ -160,7 +160,7 @@ func BenchmarkConvertToKVs(b *testing.B) {

func benchmarkConvertToKVs(b *testing.B, g workload.Generator) {
ctx := context.Background()
const tableID = descpb.ID(keys.MinUserDescID)
tableID := descpb.ID(keys.TestingUserDescID(0))
ts := timeutil.Now()

var bytes int64
Expand Down
Loading

0 comments on commit 9edb27f

Please sign in to comment.