Skip to content

Commit

Permalink
sql,descs: add & adopt descriptor_validation session var
Browse files Browse the repository at this point in the history
This commit removes the `sql.catalog.descs.validate_on_write.enabled`
cluster setting and replaces it with the `descriptor_validation` session
variable. The default value is 'on' which indicates that catalog
descriptors are to be validated when both read from- and written to
the system.descriptor table. Other possible values are 'off' which
disables validation entirely and 'read_only' which disables it for
writes.

Informs cockroachdb#50651.

Release note (sql change): added a new 'descriptor_validation' session
variable which can be set to 'read_only' or 'off' to disable descriptor
validation, which may be useful when mitigating or recovering from
catalog corruption.
  • Loading branch information
Marius Posta committed Oct 27, 2022
1 parent f706ed2 commit a15b492
Show file tree
Hide file tree
Showing 45 changed files with 351 additions and 158 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/avro_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func parseTableDesc(createTableStmt string) (catalog.TableDescriptor, error) {
mutDesc.Families = []descpb.ColumnFamilyDescriptor{
{ID: primary, Name: "primary", ColumnIDs: mutDesc.PublicColumnIDs(), ColumnNames: columnNames},
}
return mutDesc, descbuilder.ValidateSelf(mutDesc, clusterversion.TestingClusterVersion)
return mutDesc, descbuilder.ValidateSelf(mutDesc, clusterversion.TestingClusterVersion, nil /* sd */)
}

func parseValues(tableDesc catalog.TableDescriptor, values string) ([]rowenc.EncDatumRow, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/cdcevent/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func newRowFetcherCache(
return &rowFetcherCache{
codec: codec,
leaseMgr: leaseMgr,
collection: cf.NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */),
collection: cf.NewCollection(ctx, nil /* sds */, nil /* monitor */),
db: db,
fetchers: cache.NewUnorderedCache(DefaultCacheConfig),
watchedFamilies: watchedFamilies,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,7 +1166,7 @@ func getQualifiedTableName(
func getQualifiedTableNameObj(
ctx context.Context, execCfg *sql.ExecutorConfig, txn *kv.Txn, desc catalog.TableDescriptor,
) (tree.TableName, error) {
col := execCfg.CollectionFactory.NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* monitor */)
col := execCfg.CollectionFactory.NewCollection(ctx, nil /* sds */, nil /* monitor */)
dbDesc, err := col.Direct().MustGetDatabaseDescByID(ctx, txn, desc.GetParentID())
if err != nil {
return tree.TableName{}, err
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/partitionccl/partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ func (pt *partitioningTest) parse() error {
return err
}
pt.parsed.tableDesc = mutDesc
if err := descbuilder.ValidateSelf(pt.parsed.tableDesc, clusterversion.TestingClusterVersion); err != nil {
if err := descbuilder.ValidateSelf(
pt.parsed.tableDesc, clusterversion.TestingClusterVersion, nil, /* sd */
); err != nil {
return err
}
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ func writeMutation(
) {
tableDesc.Mutations = append(tableDesc.Mutations, m)
tableDesc.Version++
if err := descbuilder.ValidateSelf(tableDesc, clusterversion.TestingClusterVersion); err != nil {
if err := descbuilder.ValidateSelf(
tableDesc, clusterversion.TestingClusterVersion, nil, /* sd */
); err != nil {
t.Fatal(err)
}
if err := kvDB.Put(
Expand Down
1 change: 1 addition & 0 deletions pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ var retiredSettings = map[string]struct{}{
"sql.ttl.default_range_concurrency": {},

// removed as of 23.1.
"sql.catalog.descs.validate_on_write.enabled": {},
"sql.distsql.max_running_flows": {},
"sql.distsql.flow_scheduler_queueing.enabled": {},
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/alter_primary_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ func (p *planner) AlterPrimaryKey(
}
tableDesc.AddPrimaryKeySwapMutation(swapArgs)

if err := descbuilder.ValidateSelf(tableDesc, version); err != nil {
if err := descbuilder.ValidateSelf(tableDesc, version, p.SessionData()); err != nil {
return err
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/catalog/descbuilder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ go_library(
"//pkg/sql/catalog/schemadesc",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/catalog/typedesc",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/util/hlc",
"//pkg/util/protoutil",
"@com_github_cockroachdb_errors//:errors",
Expand Down
12 changes: 11 additions & 1 deletion pkg/sql/catalog/descbuilder/desc_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -174,6 +176,14 @@ func BuildMutable(
}

// ValidateSelf validates that the descriptor is internally consistent.
func ValidateSelf(desc catalog.Descriptor, version clusterversion.ClusterVersion) error {
//
// When the optional session data is provided and the descriptor validation mode
// session var is set to 'off', no validation is performed.
func ValidateSelf(
desc catalog.Descriptor, version clusterversion.ClusterVersion, sd *sessiondata.SessionData,
) error {
if sd != nil && sd.DescriptorValidationMode == sessiondatapb.DescriptorValidationOff {
return nil
}
return validate.Self(version, desc)
}
2 changes: 1 addition & 1 deletion pkg/sql/catalog/descs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/sql/catalog",
Expand All @@ -56,6 +55,7 @@ go_library(
"//pkg/sql/sem/catconstants",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlerrors",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlutil",
Expand Down
37 changes: 15 additions & 22 deletions pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catalogkeys"
Expand All @@ -29,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/lease"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
Expand All @@ -48,7 +48,7 @@ func newCollection(
hydrated *hydrateddesc.Cache,
systemDatabase *catkv.SystemDatabaseCache,
virtualSchemas catalog.VirtualSchemas,
temporarySchemaProvider TemporarySchemaProvider,
sds *sessiondata.Stack,
monitor *mon.BytesMonitor,
) *Collection {
v := settings.Version.ActiveVersion(ctx)
Expand All @@ -60,8 +60,8 @@ func newCollection(
virtual: makeVirtualDescriptors(virtualSchemas),
leased: makeLeasedDescriptors(leaseMgr),
uncommitted: makeUncommittedDescriptors(monitor),
stored: catkv.MakeStoredCatalog(cr, monitor),
temporary: makeTemporaryDescriptors(settings, codec, temporarySchemaProvider),
stored: catkv.MakeStoredCatalog(cr, sds, monitor),
sds: sds,
}
}

Expand Down Expand Up @@ -111,8 +111,8 @@ type Collection struct {
// non-public schema elements during a schema change).
synthetic syntheticDescriptors

// temporary contains logic to access temporary schema descriptors.
temporary temporaryDescriptors
// sds is used to access temporary schema descriptors and session vars.
sds *sessiondata.Stack

// hydrated is node-level cache of table descriptors which utilize
// user-defined types.
Expand Down Expand Up @@ -255,22 +255,13 @@ func (tc *Collection) AddUncommittedDescriptor(
return tc.uncommitted.upsert(ctx, desc)
}

// ValidateOnWriteEnabled is the cluster setting used to enable or disable
// validating descriptors prior to writing.
var ValidateOnWriteEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
"sql.catalog.descs.validate_on_write.enabled",
"set to true to validate descriptors prior to writing, false to disable; default is true",
true, /* defaultValue */
)

// WriteDescToBatch calls MaybeIncrementVersion, adds the descriptor to the
// collection as an uncommitted descriptor, and writes it into b.
func (tc *Collection) WriteDescToBatch(
ctx context.Context, kvTrace bool, desc catalog.MutableDescriptor, b *kv.Batch,
) error {
desc.MaybeIncrementVersion()
if !tc.skipValidationOnWrite && ValidateOnWriteEnabled.Get(&tc.settings.SV) {
if tc.validateOnWrite() {
if err := validate.Self(tc.version, desc); err != nil {
return err
}
Expand Down Expand Up @@ -588,19 +579,21 @@ func (tc *Collection) SetSession(session sqlliveness.Session) {
tc.sqlLivenessSession = session
}

// SetTemporaryDescriptors is used in the context of the internal executor
// to override the temporary descriptors during temporary object
// cleanup.
func (tc *Collection) SetTemporaryDescriptors(provider TemporarySchemaProvider) {
tc.temporary = makeTemporaryDescriptors(tc.settings, tc.codec(), provider)
// SetSessionDataStack sets a session data stack for this Collection.
// This is used for looking up:
// - temporary descriptors,
// - collection-specific session data variables.
func (tc *Collection) SetSessionDataStack(sds *sessiondata.Stack) {
tc.sds = sds
tc.stored.SetSessionDataStack(sds)
}

// Direct exports the catkv.Direct interface.
type Direct = catkv.Direct

// Direct provides direct access to the underlying KV-storage.
func (tc *Collection) Direct() Direct {
return catkv.MakeDirect(tc.codec(), tc.version)
return catkv.MakeDirect(tc.codec(), tc.sds, tc.version)
}

// MakeTestCollection makes a Collection that can be used for tests.
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/catalog/descs/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestCollectionWriteDescToBatch(t *testing.T) {

db := s0.DB()
descriptors := s0.ExecutorConfig().(sql.ExecutorConfig).CollectionFactory.
NewCollection(ctx, nil /* TemporarySchemaProvider */, nil /* Monitor */)
NewCollection(ctx, nil /* sds */, nil /* Monitor */)

// Note this transaction abuses the mechanisms normally required for updating
// tables and is just for testing what this test intends to exercise.
Expand Down Expand Up @@ -654,7 +654,7 @@ func TestCollectionProperlyUsesMemoryMonitoring(t *testing.T) {

// Create a `Collection` with monitor hooked up.
col := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).CollectionFactory.
NewCollection(ctx, nil /* temporarySchemaProvider */, monitor)
NewCollection(ctx, nil /* sds */, monitor)
require.Equal(t, int64(0), monitor.AllocBytes())

// Read all the descriptors into `col` and assert this read will finish without error.
Expand All @@ -673,7 +673,7 @@ func TestCollectionProperlyUsesMemoryMonitoring(t *testing.T) {

// Repeat the process again and assert this time memory allocation will err out.
col = tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).CollectionFactory.
NewCollection(ctx, nil /* temporarySchemaProvider */, monitor)
NewCollection(ctx, nil /* sds */, monitor)
_, err2 := col.GetAllDescriptors(ctx, txn)
require.Error(t, err2)

Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/catalog/descs/descriptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -245,7 +246,7 @@ func (q *byIDLookupContext) lookupVirtual(
func (q *byIDLookupContext) lookupTemporary(
id descpb.ID,
) (catalog.Descriptor, catalog.ValidationLevel, error) {
td := q.tc.temporary.getSchemaByID(id)
td := q.tc.temporary().getSchemaByID(id)
if td == nil {
return nil, catalog.NoValidation, nil
}
Expand Down Expand Up @@ -434,7 +435,7 @@ func (tc *Collection) getNonVirtualDescriptorID(
if !isSchema || !isTemporarySchema(name) {
return continueLookups, descpb.InvalidID, nil
}
avoidFurtherLookups, td := tc.temporary.getSchemaByName(ctx, parentID, name)
avoidFurtherLookups, td := tc.temporary().getSchemaByName(parentID, name)
if td != nil {
return haltLookups, td.GetID(), nil
}
Expand Down Expand Up @@ -551,6 +552,9 @@ func (tc *Collection) finalizeDescriptors(
}
}
// Ensure that all descriptors are sufficiently validated.
if tc.stored.ValidationMode() == sessiondatapb.DescriptorValidationOff {
return nil
}
requiredLevel := validate.MutableRead
if !flags.RequireMutable && !flags.AvoidLeased {
requiredLevel = validate.ImmutableRead
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/catalog/descs/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,13 @@ func NewBareBonesCollectionFactory(

// NewCollection constructs a new Collection.
func (cf *CollectionFactory) NewCollection(
ctx context.Context, temporarySchemaProvider TemporarySchemaProvider, monitor *mon.BytesMonitor,
ctx context.Context, sds *sessiondata.Stack, monitor *mon.BytesMonitor,
) *Collection {
if monitor == nil {
// If an upstream monitor is not provided, the default, unlimited monitor will be used.
// All downstream resource allocation/releases on this default monitor will then be no-ops.
monitor = cf.defaultMonitor
}
return newCollection(ctx, cf.leaseMgr, cf.settings, cf.codec, cf.hydrated, cf.systemDatabase,
cf.virtualSchemas, temporarySchemaProvider, monitor)
cf.virtualSchemas, sds, monitor)
}
Loading

0 comments on commit a15b492

Please sign in to comment.