diff --git a/pkg/jobs/BUILD.bazel b/pkg/jobs/BUILD.bazel index bc421798be84..464296abb802 100644 --- a/pkg/jobs/BUILD.bazel +++ b/pkg/jobs/BUILD.bazel @@ -47,6 +47,7 @@ go_library( "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", "//pkg/sql/isql", + "//pkg/sql/pgwire/pgcode", "//pkg/sql/pgwire/pgerror", "//pkg/sql/protoreflect", "//pkg/sql/sem/builtins", diff --git a/pkg/jobs/job_info_storage.go b/pkg/jobs/job_info_storage.go index 69bd3ba093fe..d749910f835c 100644 --- a/pkg/jobs/job_info_storage.go +++ b/pkg/jobs/job_info_storage.go @@ -235,7 +235,10 @@ func (i InfoStorage) Write(ctx context.Context, infoKey string, value []byte) er if value == nil { return errors.AssertionFailedf("missing value (infoKey %q)", infoKey) } - return i.write(ctx, infoKey, value) + if err := i.write(ctx, infoKey, value); err != nil { + return MaybeGenerateForcedRetryableError(ctx, i.txn.KV(), err) + } + return nil } // Delete removes the info record for the provided infoKey. diff --git a/pkg/jobs/utils.go b/pkg/jobs/utils.go index 80bc909cc221..11f4ca568731 100644 --- a/pkg/jobs/utils.go +++ b/pkg/jobs/utils.go @@ -17,7 +17,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/errors" ) @@ -140,3 +143,45 @@ ORDER BY created` } return false /* exists */, err } + +// IsJobTypeColumnDoesNotExistError returns true if the error is of the form +// `column "job_type" does not exist`. +func isJobTypeColumnDoesNotExistError(err error) bool { + return pgerror.GetPGCode(err) == pgcode.UndefinedColumn && + strings.Contains(err.Error(), "column \"job_type\" does not exist") +} + +// isJobInfoTableDoesNotExistError returns true if the error is of the form +// `related "job_info" does not exist`. +func isJobInfoTableDoesNotExistError(err error) bool { + return pgerror.GetPGCode(err) == pgcode.UndefinedTable && + strings.Contains(err.Error(), "relation \"system.job_info\" does not exist") +} + +// MaybeGenerateForcedRetryableError returns a +// TransactionRetryWithProtoRefreshError that will cause the txn to be retried +// if the error is because of an undefined job_type column or missing job_info +// table. +// +// In https://github.com/cockroachdb/cockroach/issues/106762 we noticed that if +// a query is executed with an AS OF SYSTEM TIME clause that picks a transaction +// timestamp before the job_type migration, then parts of the jobs +// infrastructure will attempt to query the job_type column even though it +// doesn't exist at the transaction's timestamp. +// +// As a short term fix, when we encounter an `UndefinedTable` or +// `UndefinedColumn` error we generate a synthetic retryable error so that the +// txn is pushed to a higher timestamp at which the upgrade will have completed +// and the table/column will be visible. The longer term fix is being tracked in +// https://github.com/cockroachdb/cockroach/issues/106764. +func MaybeGenerateForcedRetryableError(ctx context.Context, txn *kv.Txn, err error) error { + if err != nil && isJobTypeColumnDoesNotExistError(err) { + return txn.GenerateForcedRetryableError(ctx, "synthetic error "+ + "to push timestamp to after the `job_type` upgrade has run") + } + if err != nil && isJobInfoTableDoesNotExistError(err) { + return txn.GenerateForcedRetryableError(ctx, "synthetic error "+ + "to push timestamp to after the `job_info` upgrade has run") + } + return err +} diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 80d8c898361c..19b929cbb975 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -1064,7 +1064,7 @@ func populateSystemJobsTableRows( params..., ) if err != nil { - return matched, err + return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err) } cleanup := func(ctx context.Context) { @@ -1077,7 +1077,7 @@ func populateSystemJobsTableRows( for { hasNext, err := it.Next(ctx) if !hasNext || err != nil { - return matched, err + return matched, jobs.MaybeGenerateForcedRetryableError(ctx, p.Txn(), err) } currentRow := it.Cur()