diff --git a/pkg/sql/catalog/descs/BUILD.bazel b/pkg/sql/catalog/descs/BUILD.bazel index 883e5e32f46a..a7e544342dba 100644 --- a/pkg/sql/catalog/descs/BUILD.bazel +++ b/pkg/sql/catalog/descs/BUILD.bazel @@ -117,18 +117,24 @@ go_test( "//pkg/sql/sem/catid", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", + "//pkg/sql/sqlliveness", + "//pkg/sql/sqlliveness/sqllivenesstestutils", "//pkg/sql/tests", "//pkg/sql/types", "//pkg/testutils/datapathutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/mon", "//pkg/util/randutil", + "//pkg/util/syncutil", + "@com_github_cockroachdb_cockroach_go_v2//crdb", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", "@com_github_lib_pq//oid", "@com_github_stretchr_testify//require", diff --git a/pkg/sql/catalog/descs/leased_descriptors.go b/pkg/sql/catalog/descs/leased_descriptors.go index 24c0e22330ad..32933a1084f4 100644 --- a/pkg/sql/catalog/descs/leased_descriptors.go +++ b/pkg/sql/catalog/descs/leased_descriptors.go @@ -12,6 +12,7 @@ package descs import ( "context" + "fmt" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -198,16 +199,7 @@ func (ld *leasedDescriptors) maybeUpdateDeadline( // expiration as the deadline will serve a purpose. var deadline hlc.Timestamp if session != nil { - if expiration, txnTS := session.Expiration(), txn.ReadTimestamp(); txnTS.Less(expiration) { - deadline = expiration - } else { - // If the session has expired relative to this transaction, propagate - // a clear error that that's what is going on. - return errors.Errorf( - "liveness session expired %s before transaction", - txnTS.GoTime().Sub(expiration.GoTime()), - ) - } + deadline = session.Expiration() } if leaseDeadline, ok := ld.getDeadline(); ok && (deadline.IsEmpty() || leaseDeadline.Less(deadline)) { // Set the deadline to the lease deadline if session expiration is empty @@ -216,11 +208,40 @@ func (ld *leasedDescriptors) maybeUpdateDeadline( } // If the deadline has been set, update the transaction deadline. if !deadline.IsEmpty() { + // If the deadline certainly cannot be met, return an error which will + // be retried explicitly. + if txnTs := txn.ReadTimestamp(); deadline.LessEq(txnTs) { + return &deadlineExpiredError{ + txnTS: txnTs, + expiration: deadline, + } + } return txn.UpdateDeadline(ctx, deadline) } return nil } +// deadlineExpiredError is returned when the deadline from either a descriptor +// lease or a sqlliveness session is before the current transaction timestamp. +// The error is a user-visible retry. +type deadlineExpiredError struct { + txnTS, expiration hlc.Timestamp +} + +func (e *deadlineExpiredError) SafeFormatError(p errors.Printer) (next error) { + p.Printf("liveness session expired %v before transaction", + e.txnTS.GoTime().Sub(e.expiration.GoTime())) + return nil +} + +func (e *deadlineExpiredError) ClientVisibleRetryError() {} + +func (e *deadlineExpiredError) Error() string { + return fmt.Sprint(errors.Formattable(e)) +} + +var _ errors.SafeFormatter = (*deadlineExpiredError)(nil) + func (ld *leasedDescriptors) getDeadline() (deadline hlc.Timestamp, haveDeadline bool) { _ = ld.cache.IterateByID(func(descriptor catalog.NameEntry) error { expiration := descriptor.(lease.LeasedDescriptor).Expiration() diff --git a/pkg/sql/catalog/descs/txn_external_test.go b/pkg/sql/catalog/descs/txn_external_test.go index 89b00e29dccd..aff7073b9f8b 100644 --- a/pkg/sql/catalog/descs/txn_external_test.go +++ b/pkg/sql/catalog/descs/txn_external_test.go @@ -12,15 +12,25 @@ package descs_test import ( "context" + gosql "database/sql" + "math/rand" "testing" + "github.com/cockroachdb/cockroach-go/v2/crdb" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "github.com/stretchr/testify/require" ) @@ -69,3 +79,89 @@ func TestTxnWithStepping(t *testing.T) { return nil }, isql.SteppingEnabled())) } + +// TestLivenessSessionExpiredErrorResultsInRestartAtSQLLayer ensures that if +// a transaction sees a deadline which is in the past of the current +// transaction timestamp, it'll propagate a retriable error. This should only +// happen in transient situations. +// +// Note that the sqlliveness session is not the only source of a deadline. In +// fact, it's probably more common that schema changes lead to descriptor +// deadlines kicking in. By default, at time of writing, sqlliveness sessions +// only apply to secondary tenants. The test leverages them because they are +// the easiest way to interact with the deadline at a session level. +func TestLivenessSessionExpiredErrorResultsInRestartAtSQLLayer(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var ( + toFail = struct { + syncutil.Mutex + remaining int + }{} + shouldFail = func() bool { + toFail.Lock() + defer toFail.Unlock() + if toFail.remaining > 0 { + toFail.remaining-- + return true + } + return false + } + checkFailed = func(t *testing.T) { + toFail.Lock() + defer toFail.Unlock() + require.Zero(t, toFail.remaining) + } + setRemaining = func(t *testing.T, n int) { + toFail.Lock() + defer toFail.Unlock() + toFail.remaining = n + } + ) + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLExecutor: &sql.ExecutorTestingKnobs{ + ForceSQLLivenessSession: true, + }, + SQLLivenessKnobs: &sqlliveness.TestingKnobs{ + SessionOverride: func(ctx context.Context) (sqlliveness.Session, error) { + // Only client sessions have the client tag. We control the only + // client session. + tags := logtags.FromContext(ctx) + if _, hasClient := tags.GetTag("client"); hasClient && shouldFail() { + // This fake session is certainly expired. + return &sqllivenesstestutils.FakeSession{ + ExpTS: hlc.Timestamp{WallTime: 1}, + }, nil + } + return nil, nil + }, + }, + }, + }) + defer s.Stopper().Stop(ctx) + + tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY)") + + // Ensure that the internal retry works seamlessly + t.Run("auto-retry", func(t *testing.T) { + setRemaining(t, rand.Intn(20)) + tdb.Exec(t, `SELECT * FROM t`) + checkFailed(t) + }) + t.Run("explicit transaction", func(t *testing.T) { + setRemaining(t, rand.Intn(20)) + require.NoError(t, crdb.ExecuteTx(ctx, sqlDB, nil, func(tx *gosql.Tx) error { + _, err := tx.Exec("SELECT * FROM t") + if err != nil { + return err + } + _, err = tx.Exec(`INSERT INTO t VALUES (1)`) + return err + })) + checkFailed(t) + }) +} diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index ef8a0a0721df..57944392be2a 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -3102,12 +3102,15 @@ var retriableMinTimestampBoundUnsatisfiableError = errors.Newf( "retriable MinTimestampBoundUnsatisfiableError", ) +// errIsRetriable is true if the error is a client-visible retry error +// or the error is a special error that is handled internally and retried. func errIsRetriable(err error) bool { - return errors.HasType(err, (*kvpb.TransactionRetryWithProtoRefreshError)(nil)) || - scerrors.ConcurrentSchemaChangeDescID(err) != descpb.InvalidID || + return errors.HasInterface(err, (*pgerror.ClientVisibleRetryError)(nil)) || errors.Is(err, retriableMinTimestampBoundUnsatisfiableError) || + // Note that this error is not handled internally and can make it to the + // client in implicit transactions. This is not great; it should + // be marked as a client visible retry error. errors.Is(err, descidgen.ErrDescIDSequenceMigrationInProgress) || - execinfra.IsDynamicQueryHasNoHomeRegionError(err) || descs.IsTwoVersionInvariantViolationError(err) } @@ -3517,18 +3520,8 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( // Session is considered active when executing a transaction. ex.totalActiveTimeStopWatch.Start() - if !ex.server.cfg.Codec.ForSystemTenant() { - // Update the leased descriptor collection with the current sqlliveness.Session. - // This is required in the multi-tenant environment to update the transaction - // deadline to either the session expiry or the leased descriptor deadline, - // whichever is sooner. We need this to ensure that transactions initiated - // by ephemeral SQL pods in multi-tenant environments are committed before the - // session expires. - session, err := ex.server.cfg.SQLLiveness.Session(ex.Ctx()) - if err != nil { - return advanceInfo{}, err - } - ex.extraTxnState.descCollection.SetSession(session) + if err := ex.maybeSetSQLLivenessSession(); err != nil { + return advanceInfo{}, err } case txnCommit: if res.Err() != nil { @@ -3605,8 +3598,15 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( } fallthrough - case txnRestart, txnRollback: + case txnRollback: ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent) + case txnRestart: + // In addition to resetting the extraTxnState, the restart event may + // also need to reset the sqlliveness.Session. + ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent) + if err := ex.maybeSetSQLLivenessSession(); err != nil { + return advanceInfo{}, err + } default: return advanceInfo{}, errors.AssertionFailedf( "unexpected event: %v", errors.Safe(advInfo.txnEvent)) @@ -3614,6 +3614,24 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( return advInfo, nil } +func (ex *connExecutor) maybeSetSQLLivenessSession() error { + if !ex.server.cfg.Codec.ForSystemTenant() || + ex.server.cfg.TestingKnobs.ForceSQLLivenessSession { + // Update the leased descriptor collection with the current sqlliveness.Session. + // This is required in the multi-tenant environment to update the transaction + // deadline to either the session expiry or the leased descriptor deadline, + // whichever is sooner. We need this to ensure that transactions initiated + // by ephemeral SQL pods in multi-tenant environments are committed before the + // session expires. + session, err := ex.server.cfg.SQLLiveness.Session(ex.Ctx()) + if err != nil { + return err + } + ex.extraTxnState.descCollection.SetSession(session) + } + return nil +} + func (ex *connExecutor) handleWaitingForConcurrentSchemaChanges( ctx context.Context, descID descpb.ID, ) error { diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index 7b8a1404307b..301ae303a785 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -1264,24 +1264,6 @@ CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT); locked(func() { require.True(t, mu.txnDeadline.Less(fs.Expiration())) }) }) - t.Run("expired session leads to clear error", func(t *testing.T) { - // In this test we use an intentionally expired session in the tenant - // and observe that we get a clear error indicating that the session - // was expired. - sessionDuration := -time.Nanosecond - fs := fakeSession{ - ExpTS: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0), - } - defer setClientSessionOverride(&fs)() - txn, err := sqlConn.Begin() - if err != nil { - t.Fatal(err) - } - _, err = txn.ExecContext(ctx, "UPSERT INTO t1.test(k, v) VALUES (1, 'abc')") - require.Regexp(t, `liveness session expired (\S+) before transaction`, err) - require.NoError(t, txn.Rollback()) - }) - t.Run("single_tenant_ignore_session_expiry", func(t *testing.T) { // In this test, we check that the session expiry is ignored in a single-tenant // environment. To verify this, we deliberately set the session duration to be diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index fe4fd1cbed3d..208a9042e8c3 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1595,6 +1595,10 @@ type ExecutorTestingKnobs struct { // BeforeCopyFromInsert, if set, will be called during a COPY FROM insert statement. BeforeCopyFromInsert func() error + + // ForceSQLLivenessSession will force the use of a sqlliveness session for + // transaction deadlines even in the system tenant. + ForceSQLLivenessSession bool } // PGWireTestingKnobs contains knobs for the pgwire module.