From 6b50953299b32480bd536279cd79243fcc85020e Mon Sep 17 00:00:00 2001 From: Oliver Tan Date: Wed, 23 Mar 2022 11:06:07 +1100 Subject: [PATCH] sql: implement crdb_internal.repair_ttl_table_scheduled_job This commit is intended for "repairing" jobs that may have been broken by a broken schema change interaction. Release justification: high priority fix for new functionality Release note (sql change): Adds a `crdb_internal.repair_ttl_table_scheduled_job` builtin, which repairs the given TTL table's scheduled job by supplanting it with a valid schedule. --- docs/generated/sql/functions.md | 2 + pkg/sql/BUILD.bazel | 4 + pkg/sql/check.go | 80 +++++++++++++++---- pkg/sql/check_test.go | 65 ++++++++++++--- pkg/sql/faketreeeval/evalctx.go | 5 ++ .../testdata/logic_test/row_level_ttl | 6 ++ pkg/sql/sem/builtins/builtins.go | 19 +++++ pkg/sql/sem/tree/eval.go | 3 + 8 files changed, 157 insertions(+), 27 deletions(-) diff --git a/docs/generated/sql/functions.md b/docs/generated/sql/functions.md index 3b9f6ab4e135..bda9847c878c 100644 --- a/docs/generated/sql/functions.md +++ b/docs/generated/sql/functions.md @@ -3001,6 +3001,8 @@ SELECT * FROM crdb_internal.check_consistency(true, ‘\x02’, ‘\x04’)

crdb_internal.range_stats(key: bytes) → jsonb

This function is used to retrieve range statistics information as a JSON object.

+crdb_internal.repair_ttl_table_scheduled_job(oid: oid) → void

Repairs the scheduled job for a TTL table if it is missing.

+
crdb_internal.reset_index_usage_stats() → bool

This function is used to clear the collected index usage statistics.

crdb_internal.reset_sql_stats() → bool

This function is used to clear the collected SQL statistics.

diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 4fa37517eec0..f86771b91b45 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -469,6 +469,7 @@ go_test( "backfill_test.go", "builtin_mem_usage_test.go", "builtin_test.go", + "check_test.go", "comment_on_column_test.go", "comment_on_constraint_test.go", "comment_on_database_test.go", @@ -585,6 +586,7 @@ go_test( "//pkg/gossip", "//pkg/jobs", "//pkg/jobs/jobspb", + "//pkg/jobs/jobstest", "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient", @@ -653,6 +655,7 @@ go_test( "//pkg/sql/sqlliveness", "//pkg/sql/sqlstats", "//pkg/sql/sqltestutils", + "//pkg/sql/sqlutil", "//pkg/sql/stats", "//pkg/sql/stmtdiagnostics", "//pkg/sql/tests", @@ -709,6 +712,7 @@ go_test( "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//proto", + "@com_github_gogo_protobuf//types", "@com_github_jackc_pgconn//:pgconn", "@com_github_jackc_pgtype//:pgtype", "@com_github_jackc_pgx_v4//:pgx", diff --git a/pkg/sql/check.go b/pkg/sql/check.go index c0dfe79e9648..61b6305c1077 100644 --- a/pkg/sql/check.go +++ b/pkg/sql/check.go @@ -604,6 +604,8 @@ func (p *planner) ValidateTTLScheduledJobsInCurrentDB(ctx context.Context) error return nil } +var invalidTableTTLScheduledJobError = errors.Newf("invalid scheduled job for table") + // validateTTLScheduledJobsInCurrentDB is part of the EvalPlanner interface. func (p *planner) validateTTLScheduledJobInTable( ctx context.Context, tableDesc catalog.TableDescriptor, @@ -616,6 +618,14 @@ func (p *planner) validateTTLScheduledJobInTable( execCfg := p.ExecCfg() env := JobSchedulerEnv(execCfg) + wrapError := func(origErr error) error { + return errors.WithHintf( + errors.Mark(origErr, invalidTableTTLScheduledJobError), + `use crdb_internal.repair_ttl_table_scheduled_job(%d) to repair the missing job`, + tableDesc.GetID(), + ) + } + sj, err := jobs.LoadScheduledJob( ctx, env, @@ -625,11 +635,13 @@ func (p *planner) validateTTLScheduledJobInTable( ) if err != nil { if jobs.HasScheduledJobNotFoundError(err) { - return pgerror.Newf( - pgcode.Internal, - "table id %d maps to a non-existent schedule id %d", - tableDesc.GetID(), - ttl.ScheduleID, + return wrapError( + pgerror.Newf( + pgcode.Internal, + "table id %d maps to a non-existent schedule id %d", + tableDesc.GetID(), + ttl.ScheduleID, + ), ) } return errors.Wrapf(err, "error fetching schedule id %d for table id %d", ttl.ScheduleID, tableDesc.GetID()) @@ -637,28 +649,62 @@ func (p *planner) validateTTLScheduledJobInTable( var args catpb.ScheduledRowLevelTTLArgs if err := pbtypes.UnmarshalAny(sj.ExecutionArgs().Args, &args); err != nil { - return pgerror.Wrapf( - err, - pgcode.Internal, - "error unmarshalling scheduled jobs args for table id %d, schedule id %d", - tableDesc.GetID(), - ttl.ScheduleID, + return wrapError( + pgerror.Wrapf( + err, + pgcode.Internal, + "error unmarshalling scheduled jobs args for table id %d, schedule id %d", + tableDesc.GetID(), + ttl.ScheduleID, + ), ) } if args.TableID != tableDesc.GetID() { - return pgerror.Newf( - pgcode.Internal, - "schedule id %d points to table id %d instead of table id %d", - ttl.ScheduleID, - args.TableID, - tableDesc.GetID(), + return wrapError( + pgerror.Newf( + pgcode.Internal, + "schedule id %d points to table id %d instead of table id %d", + ttl.ScheduleID, + args.TableID, + tableDesc.GetID(), + ), ) } return nil } +// RepairTTLScheduledJobForTable is part of the EvalPlanner interface. +func (p *planner) RepairTTLScheduledJobForTable(ctx context.Context, tableID int64) error { + tableDesc, err := p.Descriptors().GetMutableTableByID(ctx, p.txn, descpb.ID(tableID), tree.ObjectLookupFlagsWithRequired()) + if err != nil { + return err + } + validateErr := p.validateTTLScheduledJobInTable(ctx, tableDesc) + if validateErr == nil { + return nil + } + if !errors.HasType(validateErr, invalidTableTTLScheduledJobError) { + return errors.Wrap(validateErr, "error validating TTL on table") + } + sj, err := CreateRowLevelTTLScheduledJob( + ctx, + p.ExecCfg(), + p.txn, + p.User(), + tableDesc.GetID(), + tableDesc.GetRowLevelTTL(), + ) + if err != nil { + return err + } + tableDesc.RowLevelTTL.ScheduleID = sj.ScheduleID() + return p.Descriptors().WriteDesc( + ctx, false /* kvTrace */, tableDesc, p.txn, + ) +} + func formatValues(colNames []string, values tree.Datums) string { var pairs bytes.Buffer for i := range values { diff --git a/pkg/sql/check_test.go b/pkg/sql/check_test.go index b12bc071cbdb..abdf0d95add6 100644 --- a/pkg/sql/check_test.go +++ b/pkg/sql/check_test.go @@ -17,16 +17,26 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobstest" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/errors" + pbtypes "github.com/gogo/protobuf/types" + "github.com/lib/pq" "github.com/stretchr/testify/require" ) @@ -39,7 +49,6 @@ func TestValidateTTLScheduledJobs(t *testing.T) { desc string setup func(t *testing.T, sqlDB *gosql.DB, kvDB *kv.DB, s serverutils.TestServerInterface, tableDesc *tabledesc.Mutable, scheduleID int64) expectedErrRe func(tableID descpb.ID, scheduleID int64) string - cleanup func(t *testing.T, sqlDB *gosql.DB, kvDB *kv.DB, tableID descpb.ID, scheduleID int64) }{ { desc: "not pointing at a valid scheduled job", @@ -50,13 +59,43 @@ func TestValidateTTLScheduledJobs(t *testing.T) { })) }, expectedErrRe: func(tableID descpb.ID, scheduleID int64) string { - return fmt.Sprintf(`table id %d does not have a maps to a non-existent scheduled job id %d`, tableID, scheduleID) + return fmt.Sprintf(`table id %d maps to a non-existent schedule id 0`, tableID) }, - cleanup: func(t *testing.T, sqlDB *gosql.DB, kvDB *kv.DB, tableID descpb.ID, scheduleID int64) { - _, err := sqlDB.Exec(`DROP SCHEDULE $1`, scheduleID) - require.NoError(t, err) - _, err = sqlDB.Exec(`SELECT crdb_internal.repair_ttl_table_scheduled_job($1)`, tableID) - require.NoError(t, err) + }, + { + desc: "scheduled job points at an different table", + setup: func(t *testing.T, sqlDB *gosql.DB, kvDB *kv.DB, s serverutils.TestServerInterface, tableDesc *tabledesc.Mutable, scheduleID int64) { + ie := s.InternalExecutor().(sqlutil.InternalExecutor) + require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + sj, err := jobs.LoadScheduledJob( + ctx, + jobstest.NewJobSchedulerTestEnv( + jobstest.UseSystemTables, + timeutil.Now(), + tree.ScheduledBackupExecutor, + ), + scheduleID, + ie, + txn, + ) + if err != nil { + return err + } + var args catpb.ScheduledRowLevelTTLArgs + if err := pbtypes.UnmarshalAny(sj.ExecutionArgs().Args, &args); err != nil { + return err + } + args.TableID = 0 + any, err := pbtypes.MarshalAny(&args) + if err != nil { + return err + } + sj.SetExecutionDetails(sj.ExecutorType(), jobspb.ExecutionArguments{Args: any}) + return sj.Update(ctx, ie, txn) + })) + }, + expectedErrRe: func(tableID descpb.ID, scheduleID int64) string { + return fmt.Sprintf(`schedule id %d points to table id 0 instead of table id %d`, scheduleID, tableID) }, }, } @@ -78,13 +117,19 @@ func TestValidateTTLScheduledJobs(t *testing.T) { _, err = sqlDB.Exec(`SELECT crdb_internal.validate_ttl_scheduled_jobs()`) require.Error(t, err) require.Regexp(t, tc.expectedErrRe(tableDesc.GetID(), scheduleID), err) + var pgxErr *pq.Error + require.True(t, errors.As(err, &pgxErr)) require.Regexp( t, - fmt.Sprintf(`use crdb_internal.repair_ttl_table_scheduled_job(%d) to repair the missing job`, tableDesc.GetID()), - err, + fmt.Sprintf(`use crdb_internal.repair_ttl_table_scheduled_job\(%d\) to repair the missing job`, tableDesc.GetID()), + pgxErr.Hint, ) - tc.cleanup(t, sqlDB, kvDB, tableDesc.GetID(), scheduleID) + // Repair and check jobs are valid. + _, err = sqlDB.Exec(`DROP SCHEDULE $1`, scheduleID) + require.NoError(t, err) + _, err = sqlDB.Exec(`SELECT crdb_internal.repair_ttl_table_scheduled_job($1)`, tableDesc.GetID()) + require.NoError(t, err) _, err = sqlDB.Exec(`SELECT crdb_internal.validate_ttl_scheduled_jobs()`) require.NoError(t, err) }) diff --git a/pkg/sql/faketreeeval/evalctx.go b/pkg/sql/faketreeeval/evalctx.go index 67f6975dcd5d..a9ebd29cb2ea 100644 --- a/pkg/sql/faketreeeval/evalctx.go +++ b/pkg/sql/faketreeeval/evalctx.go @@ -283,6 +283,11 @@ func (*DummyEvalPlanner) ValidateTTLScheduledJobsInCurrentDB(ctx context.Context return errors.WithStack(errEvalPlanner) } +// RepairTTLScheduledJobForTable is part of the EvalPlanner interface. +func (*DummyEvalPlanner) RepairTTLScheduledJobForTable(ctx context.Context, tableID int64) error { + return errors.WithStack(errEvalPlanner) +} + // ExecutorConfig is part of the EvalPlanner interface. func (*DummyEvalPlanner) ExecutorConfig() interface{} { return nil diff --git a/pkg/sql/logictest/testdata/logic_test/row_level_ttl b/pkg/sql/logictest/testdata/logic_test/row_level_ttl index eba93bc61f06..31a7bee13a73 100644 --- a/pkg/sql/logictest/testdata/logic_test/row_level_ttl +++ b/pkg/sql/logictest/testdata/logic_test/row_level_ttl @@ -47,6 +47,12 @@ CREATE TABLE public.tbl ( statement ok SELECT crdb_internal.validate_ttl_scheduled_jobs() +statement ok +SELECT crdb_internal.repair_ttl_table_scheduled_job('tbl'::regclass::oid) + +statement ok +SELECT crdb_internal.validate_ttl_scheduled_jobs() + statement error resetting "ttl_expire_after" is not permitted\nHINT: use `RESET \(ttl\)` to remove TTL from the table ALTER TABLE tbl RESET (ttl_expire_after) diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index eac78f26958d..031c6c581a27 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -6437,6 +6437,25 @@ table's zone configuration this will return NULL.`, }, ), + "crdb_internal.repair_ttl_table_scheduled_job": makeBuiltin( + tree.FunctionProperties{ + Category: categorySystemInfo, + }, + tree.Overload{ + Types: tree.ArgTypes{{"oid", types.Oid}}, + ReturnType: tree.FixedReturnType(types.Void), + Fn: func(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) { + oid := tree.MustBeDOid(args[0]) + if err := evalCtx.Planner.RepairTTLScheduledJobForTable(evalCtx.Ctx(), int64(oid.DInt)); err != nil { + return nil, err + } + return tree.DVoidDatum, nil + }, + Info: `Repairs the scheduled job for a TTL table if it is missing.`, + Volatility: tree.VolatilityVolatile, + }, + ), + "crdb_internal.check_password_hash_format": makeBuiltin( tree.FunctionProperties{ Category: categorySystemInfo, diff --git a/pkg/sql/sem/tree/eval.go b/pkg/sql/sem/tree/eval.go index 0956e331a291..694348ff3f7b 100644 --- a/pkg/sql/sem/tree/eval.go +++ b/pkg/sql/sem/tree/eval.go @@ -3307,6 +3307,9 @@ type EvalPlanner interface { // ValidateTTLScheduledJobsInCurrentDB checks scheduled jobs for each table // in the database maps to a scheduled job. ValidateTTLScheduledJobsInCurrentDB(ctx context.Context) error + // RepairTTLScheduledJob repairs the scheduled job for the given table if + // it is invalid. + RepairTTLScheduledJobForTable(ctx context.Context, tableID int64) error // QueryRowEx executes the supplied SQL statement and returns a single row, or // nil if no row is found, or an error if more that one row is returned.