Skip to content

Commit

Permalink
sql: implement crdb_internal.repair_ttl_table_scheduled_job
Browse files Browse the repository at this point in the history
This commit is intended for "repairing" jobs that may have been broken
by a broken schema change interaction.

Release justification: high priority fix for new functionality

Release note (sql change): Adds a
`crdb_internal.repair_ttl_table_scheduled_job` builtin, which repairs
the given TTL table's scheduled job by supplanting it with a valid
schedule.
  • Loading branch information
otan committed Mar 23, 2022
1 parent d4e0712 commit 6b50953
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 27 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3001,6 +3001,8 @@ SELECT * FROM crdb_internal.check_consistency(true, ‘\x02’, ‘\x04’)</p>
</span></td></tr>
<tr><td><a name="crdb_internal.range_stats"></a><code>crdb_internal.range_stats(key: <a href="bytes.html">bytes</a>) &rarr; jsonb</code></td><td><span class="funcdesc"><p>This function is used to retrieve range statistics information as a JSON object.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.repair_ttl_table_scheduled_job"></a><code>crdb_internal.repair_ttl_table_scheduled_job(oid: oid) &rarr; void</code></td><td><span class="funcdesc"><p>Repairs the scheduled job for a TTL table if it is missing.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.reset_index_usage_stats"></a><code>crdb_internal.reset_index_usage_stats() &rarr; <a href="bool.html">bool</a></code></td><td><span class="funcdesc"><p>This function is used to clear the collected index usage statistics.</p>
</span></td></tr>
<tr><td><a name="crdb_internal.reset_sql_stats"></a><code>crdb_internal.reset_sql_stats() &rarr; <a href="bool.html">bool</a></code></td><td><span class="funcdesc"><p>This function is used to clear the collected SQL statistics.</p>
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,7 @@ go_test(
"backfill_test.go",
"builtin_mem_usage_test.go",
"builtin_test.go",
"check_test.go",
"comment_on_column_test.go",
"comment_on_constraint_test.go",
"comment_on_database_test.go",
Expand Down Expand Up @@ -585,6 +586,7 @@ go_test(
"//pkg/gossip",
"//pkg/jobs",
"//pkg/jobs/jobspb",
"//pkg/jobs/jobstest",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient",
Expand Down Expand Up @@ -653,6 +655,7 @@ go_test(
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlstats",
"//pkg/sql/sqltestutils",
"//pkg/sql/sqlutil",
"//pkg/sql/stats",
"//pkg/sql/stmtdiagnostics",
"//pkg/sql/tests",
Expand Down Expand Up @@ -709,6 +712,7 @@ go_test(
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_gogo_protobuf//proto",
"@com_github_gogo_protobuf//types",
"@com_github_jackc_pgconn//:pgconn",
"@com_github_jackc_pgtype//:pgtype",
"@com_github_jackc_pgx_v4//:pgx",
Expand Down
80 changes: 63 additions & 17 deletions pkg/sql/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,8 @@ func (p *planner) ValidateTTLScheduledJobsInCurrentDB(ctx context.Context) error
return nil
}

var invalidTableTTLScheduledJobError = errors.Newf("invalid scheduled job for table")

// validateTTLScheduledJobsInCurrentDB is part of the EvalPlanner interface.
func (p *planner) validateTTLScheduledJobInTable(
ctx context.Context, tableDesc catalog.TableDescriptor,
Expand All @@ -616,6 +618,14 @@ func (p *planner) validateTTLScheduledJobInTable(
execCfg := p.ExecCfg()
env := JobSchedulerEnv(execCfg)

wrapError := func(origErr error) error {
return errors.WithHintf(
errors.Mark(origErr, invalidTableTTLScheduledJobError),
`use crdb_internal.repair_ttl_table_scheduled_job(%d) to repair the missing job`,
tableDesc.GetID(),
)
}

sj, err := jobs.LoadScheduledJob(
ctx,
env,
Expand All @@ -625,40 +635,76 @@ func (p *planner) validateTTLScheduledJobInTable(
)
if err != nil {
if jobs.HasScheduledJobNotFoundError(err) {
return pgerror.Newf(
pgcode.Internal,
"table id %d maps to a non-existent schedule id %d",
tableDesc.GetID(),
ttl.ScheduleID,
return wrapError(
pgerror.Newf(
pgcode.Internal,
"table id %d maps to a non-existent schedule id %d",
tableDesc.GetID(),
ttl.ScheduleID,
),
)
}
return errors.Wrapf(err, "error fetching schedule id %d for table id %d", ttl.ScheduleID, tableDesc.GetID())
}

var args catpb.ScheduledRowLevelTTLArgs
if err := pbtypes.UnmarshalAny(sj.ExecutionArgs().Args, &args); err != nil {
return pgerror.Wrapf(
err,
pgcode.Internal,
"error unmarshalling scheduled jobs args for table id %d, schedule id %d",
tableDesc.GetID(),
ttl.ScheduleID,
return wrapError(
pgerror.Wrapf(
err,
pgcode.Internal,
"error unmarshalling scheduled jobs args for table id %d, schedule id %d",
tableDesc.GetID(),
ttl.ScheduleID,
),
)
}

if args.TableID != tableDesc.GetID() {
return pgerror.Newf(
pgcode.Internal,
"schedule id %d points to table id %d instead of table id %d",
ttl.ScheduleID,
args.TableID,
tableDesc.GetID(),
return wrapError(
pgerror.Newf(
pgcode.Internal,
"schedule id %d points to table id %d instead of table id %d",
ttl.ScheduleID,
args.TableID,
tableDesc.GetID(),
),
)
}

return nil
}

// RepairTTLScheduledJobForTable is part of the EvalPlanner interface.
func (p *planner) RepairTTLScheduledJobForTable(ctx context.Context, tableID int64) error {
tableDesc, err := p.Descriptors().GetMutableTableByID(ctx, p.txn, descpb.ID(tableID), tree.ObjectLookupFlagsWithRequired())
if err != nil {
return err
}
validateErr := p.validateTTLScheduledJobInTable(ctx, tableDesc)
if validateErr == nil {
return nil
}
if !errors.HasType(validateErr, invalidTableTTLScheduledJobError) {
return errors.Wrap(validateErr, "error validating TTL on table")
}
sj, err := CreateRowLevelTTLScheduledJob(
ctx,
p.ExecCfg(),
p.txn,
p.User(),
tableDesc.GetID(),
tableDesc.GetRowLevelTTL(),
)
if err != nil {
return err
}
tableDesc.RowLevelTTL.ScheduleID = sj.ScheduleID()
return p.Descriptors().WriteDesc(
ctx, false /* kvTrace */, tableDesc, p.txn,
)
}

func formatValues(colNames []string, values tree.Datums) string {
var pairs bytes.Buffer
for i := range values {
Expand Down
65 changes: 55 additions & 10 deletions pkg/sql/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,26 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
pbtypes "github.com/gogo/protobuf/types"
"github.com/lib/pq"
"github.com/stretchr/testify/require"
)

Expand All @@ -39,7 +49,6 @@ func TestValidateTTLScheduledJobs(t *testing.T) {
desc string
setup func(t *testing.T, sqlDB *gosql.DB, kvDB *kv.DB, s serverutils.TestServerInterface, tableDesc *tabledesc.Mutable, scheduleID int64)
expectedErrRe func(tableID descpb.ID, scheduleID int64) string
cleanup func(t *testing.T, sqlDB *gosql.DB, kvDB *kv.DB, tableID descpb.ID, scheduleID int64)
}{
{
desc: "not pointing at a valid scheduled job",
Expand All @@ -50,13 +59,43 @@ func TestValidateTTLScheduledJobs(t *testing.T) {
}))
},
expectedErrRe: func(tableID descpb.ID, scheduleID int64) string {
return fmt.Sprintf(`table id %d does not have a maps to a non-existent scheduled job id %d`, tableID, scheduleID)
return fmt.Sprintf(`table id %d maps to a non-existent schedule id 0`, tableID)
},
cleanup: func(t *testing.T, sqlDB *gosql.DB, kvDB *kv.DB, tableID descpb.ID, scheduleID int64) {
_, err := sqlDB.Exec(`DROP SCHEDULE $1`, scheduleID)
require.NoError(t, err)
_, err = sqlDB.Exec(`SELECT crdb_internal.repair_ttl_table_scheduled_job($1)`, tableID)
require.NoError(t, err)
},
{
desc: "scheduled job points at an different table",
setup: func(t *testing.T, sqlDB *gosql.DB, kvDB *kv.DB, s serverutils.TestServerInterface, tableDesc *tabledesc.Mutable, scheduleID int64) {
ie := s.InternalExecutor().(sqlutil.InternalExecutor)
require.NoError(t, kvDB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
sj, err := jobs.LoadScheduledJob(
ctx,
jobstest.NewJobSchedulerTestEnv(
jobstest.UseSystemTables,
timeutil.Now(),
tree.ScheduledBackupExecutor,
),
scheduleID,
ie,
txn,
)
if err != nil {
return err
}
var args catpb.ScheduledRowLevelTTLArgs
if err := pbtypes.UnmarshalAny(sj.ExecutionArgs().Args, &args); err != nil {
return err
}
args.TableID = 0
any, err := pbtypes.MarshalAny(&args)
if err != nil {
return err
}
sj.SetExecutionDetails(sj.ExecutorType(), jobspb.ExecutionArguments{Args: any})
return sj.Update(ctx, ie, txn)
}))
},
expectedErrRe: func(tableID descpb.ID, scheduleID int64) string {
return fmt.Sprintf(`schedule id %d points to table id 0 instead of table id %d`, scheduleID, tableID)
},
},
}
Expand All @@ -78,13 +117,19 @@ func TestValidateTTLScheduledJobs(t *testing.T) {
_, err = sqlDB.Exec(`SELECT crdb_internal.validate_ttl_scheduled_jobs()`)
require.Error(t, err)
require.Regexp(t, tc.expectedErrRe(tableDesc.GetID(), scheduleID), err)
var pgxErr *pq.Error
require.True(t, errors.As(err, &pgxErr))
require.Regexp(
t,
fmt.Sprintf(`use crdb_internal.repair_ttl_table_scheduled_job(%d) to repair the missing job`, tableDesc.GetID()),
err,
fmt.Sprintf(`use crdb_internal.repair_ttl_table_scheduled_job\(%d\) to repair the missing job`, tableDesc.GetID()),
pgxErr.Hint,
)

tc.cleanup(t, sqlDB, kvDB, tableDesc.GetID(), scheduleID)
// Repair and check jobs are valid.
_, err = sqlDB.Exec(`DROP SCHEDULE $1`, scheduleID)
require.NoError(t, err)
_, err = sqlDB.Exec(`SELECT crdb_internal.repair_ttl_table_scheduled_job($1)`, tableDesc.GetID())
require.NoError(t, err)
_, err = sqlDB.Exec(`SELECT crdb_internal.validate_ttl_scheduled_jobs()`)
require.NoError(t, err)
})
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/faketreeeval/evalctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,11 @@ func (*DummyEvalPlanner) ValidateTTLScheduledJobsInCurrentDB(ctx context.Context
return errors.WithStack(errEvalPlanner)
}

// RepairTTLScheduledJobForTable is part of the EvalPlanner interface.
func (*DummyEvalPlanner) RepairTTLScheduledJobForTable(ctx context.Context, tableID int64) error {
return errors.WithStack(errEvalPlanner)
}

// ExecutorConfig is part of the EvalPlanner interface.
func (*DummyEvalPlanner) ExecutorConfig() interface{} {
return nil
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/row_level_ttl
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ CREATE TABLE public.tbl (
statement ok
SELECT crdb_internal.validate_ttl_scheduled_jobs()

statement ok
SELECT crdb_internal.repair_ttl_table_scheduled_job('tbl'::regclass::oid)

statement ok
SELECT crdb_internal.validate_ttl_scheduled_jobs()

statement error resetting "ttl_expire_after" is not permitted\nHINT: use `RESET \(ttl\)` to remove TTL from the table
ALTER TABLE tbl RESET (ttl_expire_after)

Expand Down
19 changes: 19 additions & 0 deletions pkg/sql/sem/builtins/builtins.go
Original file line number Diff line number Diff line change
Expand Up @@ -6437,6 +6437,25 @@ table's zone configuration this will return NULL.`,
},
),

"crdb_internal.repair_ttl_table_scheduled_job": makeBuiltin(
tree.FunctionProperties{
Category: categorySystemInfo,
},
tree.Overload{
Types: tree.ArgTypes{{"oid", types.Oid}},
ReturnType: tree.FixedReturnType(types.Void),
Fn: func(evalCtx *tree.EvalContext, args tree.Datums) (tree.Datum, error) {
oid := tree.MustBeDOid(args[0])
if err := evalCtx.Planner.RepairTTLScheduledJobForTable(evalCtx.Ctx(), int64(oid.DInt)); err != nil {
return nil, err
}
return tree.DVoidDatum, nil
},
Info: `Repairs the scheduled job for a TTL table if it is missing.`,
Volatility: tree.VolatilityVolatile,
},
),

"crdb_internal.check_password_hash_format": makeBuiltin(
tree.FunctionProperties{
Category: categorySystemInfo,
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/sem/tree/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -3307,6 +3307,9 @@ type EvalPlanner interface {
// ValidateTTLScheduledJobsInCurrentDB checks scheduled jobs for each table
// in the database maps to a scheduled job.
ValidateTTLScheduledJobsInCurrentDB(ctx context.Context) error
// RepairTTLScheduledJob repairs the scheduled job for the given table if
// it is invalid.
RepairTTLScheduledJobForTable(ctx context.Context, tableID int64) error

// QueryRowEx executes the supplied SQL statement and returns a single row, or
// nil if no row is found, or an error if more that one row is returned.
Expand Down

0 comments on commit 6b50953

Please sign in to comment.