From a7ab071cea1695e74f0dc8f7e7a5102bdd36a644 Mon Sep 17 00:00:00 2001 From: ajwerner Date: Tue, 28 Mar 2023 00:46:19 -0400 Subject: [PATCH] sql: make deadline errors retryable If a transaction gets finds itself with a deadline before its current timestamp, something that can happen when there are schema changes or when the sqlliveness subsystem is unavailable, we ought to retry the transaction. Today, an assertion failure is sent. This is the wrong behavior. In order to achieve the desired goal, we detect the scenario in MaybeUpdateDeadline and we return an error which implements the appropriate interface to be interpreted as a retry error in the pgwire and sql layers. This change does a few other little things: 1) It simplies the logic to check whether an error is retriable so that all errors which pgwire will treat as client-visible retries are treated as retriable internally. In addition, we retain certain additional checks for retriable errors the sql layer handles explicitly. 2) It makes sure to reset the sqlliveness session in the descs.Collection when restarting transactions. Not doing this, on the one hand, was an oversight that meant that if the session turned over internally, then restarts wouldn't see it. However, it's not actually a bug today because only secondary tenants set the session, and they never acquire a new session. The test utilizes sessions rather than descriptors because the testing knob infrastructure is easier to manipulate. Fixes: #96336 Backports will deal with #76727 Release note (bug fix): In rare cases involving overload and schema changes, users could sometimes, transiently, see errors of the form "deadline below read timestamp is nonsensical; txn has would have no chance to commit". These errors carried and internal pgcode and could not be retried. This form of error is now classified as a retriable error and will be retried automatically either by the client or internally. --- pkg/sql/catalog/descs/BUILD.bazel | 6 ++ pkg/sql/catalog/descs/leased_descriptors.go | 41 ++++++--- pkg/sql/catalog/descs/txn_external_test.go | 96 +++++++++++++++++++++ pkg/sql/conn_executor.go | 50 +++++++---- pkg/sql/conn_executor_test.go | 18 ---- pkg/sql/exec_util.go | 4 + 6 files changed, 171 insertions(+), 44 deletions(-) diff --git a/pkg/sql/catalog/descs/BUILD.bazel b/pkg/sql/catalog/descs/BUILD.bazel index 883e5e32f46a..a7e544342dba 100644 --- a/pkg/sql/catalog/descs/BUILD.bazel +++ b/pkg/sql/catalog/descs/BUILD.bazel @@ -117,18 +117,24 @@ go_test( "//pkg/sql/sem/catid", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", + "//pkg/sql/sqlliveness", + "//pkg/sql/sqlliveness/sqllivenesstestutils", "//pkg/sql/tests", "//pkg/sql/types", "//pkg/testutils/datapathutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/mon", "//pkg/util/randutil", + "//pkg/util/syncutil", + "@com_github_cockroachdb_cockroach_go_v2//crdb", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", "@com_github_lib_pq//oid", "@com_github_stretchr_testify//require", diff --git a/pkg/sql/catalog/descs/leased_descriptors.go b/pkg/sql/catalog/descs/leased_descriptors.go index 24c0e22330ad..32933a1084f4 100644 --- a/pkg/sql/catalog/descs/leased_descriptors.go +++ b/pkg/sql/catalog/descs/leased_descriptors.go @@ -12,6 +12,7 @@ package descs import ( "context" + "fmt" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -198,16 +199,7 @@ func (ld *leasedDescriptors) maybeUpdateDeadline( // expiration as the deadline will serve a purpose. var deadline hlc.Timestamp if session != nil { - if expiration, txnTS := session.Expiration(), txn.ReadTimestamp(); txnTS.Less(expiration) { - deadline = expiration - } else { - // If the session has expired relative to this transaction, propagate - // a clear error that that's what is going on. - return errors.Errorf( - "liveness session expired %s before transaction", - txnTS.GoTime().Sub(expiration.GoTime()), - ) - } + deadline = session.Expiration() } if leaseDeadline, ok := ld.getDeadline(); ok && (deadline.IsEmpty() || leaseDeadline.Less(deadline)) { // Set the deadline to the lease deadline if session expiration is empty @@ -216,11 +208,40 @@ func (ld *leasedDescriptors) maybeUpdateDeadline( } // If the deadline has been set, update the transaction deadline. if !deadline.IsEmpty() { + // If the deadline certainly cannot be met, return an error which will + // be retried explicitly. + if txnTs := txn.ReadTimestamp(); deadline.LessEq(txnTs) { + return &deadlineExpiredError{ + txnTS: txnTs, + expiration: deadline, + } + } return txn.UpdateDeadline(ctx, deadline) } return nil } +// deadlineExpiredError is returned when the deadline from either a descriptor +// lease or a sqlliveness session is before the current transaction timestamp. +// The error is a user-visible retry. +type deadlineExpiredError struct { + txnTS, expiration hlc.Timestamp +} + +func (e *deadlineExpiredError) SafeFormatError(p errors.Printer) (next error) { + p.Printf("liveness session expired %v before transaction", + e.txnTS.GoTime().Sub(e.expiration.GoTime())) + return nil +} + +func (e *deadlineExpiredError) ClientVisibleRetryError() {} + +func (e *deadlineExpiredError) Error() string { + return fmt.Sprint(errors.Formattable(e)) +} + +var _ errors.SafeFormatter = (*deadlineExpiredError)(nil) + func (ld *leasedDescriptors) getDeadline() (deadline hlc.Timestamp, haveDeadline bool) { _ = ld.cache.IterateByID(func(descriptor catalog.NameEntry) error { expiration := descriptor.(lease.LeasedDescriptor).Expiration() diff --git a/pkg/sql/catalog/descs/txn_external_test.go b/pkg/sql/catalog/descs/txn_external_test.go index 89b00e29dccd..aff7073b9f8b 100644 --- a/pkg/sql/catalog/descs/txn_external_test.go +++ b/pkg/sql/catalog/descs/txn_external_test.go @@ -12,15 +12,25 @@ package descs_test import ( "context" + gosql "database/sql" + "math/rand" "testing" + "github.com/cockroachdb/cockroach-go/v2/crdb" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "github.com/stretchr/testify/require" ) @@ -69,3 +79,89 @@ func TestTxnWithStepping(t *testing.T) { return nil }, isql.SteppingEnabled())) } + +// TestLivenessSessionExpiredErrorResultsInRestartAtSQLLayer ensures that if +// a transaction sees a deadline which is in the past of the current +// transaction timestamp, it'll propagate a retriable error. This should only +// happen in transient situations. +// +// Note that the sqlliveness session is not the only source of a deadline. In +// fact, it's probably more common that schema changes lead to descriptor +// deadlines kicking in. By default, at time of writing, sqlliveness sessions +// only apply to secondary tenants. The test leverages them because they are +// the easiest way to interact with the deadline at a session level. +func TestLivenessSessionExpiredErrorResultsInRestartAtSQLLayer(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var ( + toFail = struct { + syncutil.Mutex + remaining int + }{} + shouldFail = func() bool { + toFail.Lock() + defer toFail.Unlock() + if toFail.remaining > 0 { + toFail.remaining-- + return true + } + return false + } + checkFailed = func(t *testing.T) { + toFail.Lock() + defer toFail.Unlock() + require.Zero(t, toFail.remaining) + } + setRemaining = func(t *testing.T, n int) { + toFail.Lock() + defer toFail.Unlock() + toFail.remaining = n + } + ) + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLExecutor: &sql.ExecutorTestingKnobs{ + ForceSQLLivenessSession: true, + }, + SQLLivenessKnobs: &sqlliveness.TestingKnobs{ + SessionOverride: func(ctx context.Context) (sqlliveness.Session, error) { + // Only client sessions have the client tag. We control the only + // client session. + tags := logtags.FromContext(ctx) + if _, hasClient := tags.GetTag("client"); hasClient && shouldFail() { + // This fake session is certainly expired. + return &sqllivenesstestutils.FakeSession{ + ExpTS: hlc.Timestamp{WallTime: 1}, + }, nil + } + return nil, nil + }, + }, + }, + }) + defer s.Stopper().Stop(ctx) + + tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY)") + + // Ensure that the internal retry works seamlessly + t.Run("auto-retry", func(t *testing.T) { + setRemaining(t, rand.Intn(20)) + tdb.Exec(t, `SELECT * FROM t`) + checkFailed(t) + }) + t.Run("explicit transaction", func(t *testing.T) { + setRemaining(t, rand.Intn(20)) + require.NoError(t, crdb.ExecuteTx(ctx, sqlDB, nil, func(tx *gosql.Tx) error { + _, err := tx.Exec("SELECT * FROM t") + if err != nil { + return err + } + _, err = tx.Exec(`INSERT INTO t VALUES (1)`) + return err + })) + checkFailed(t) + }) +} diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index ef8a0a0721df..57944392be2a 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -3102,12 +3102,15 @@ var retriableMinTimestampBoundUnsatisfiableError = errors.Newf( "retriable MinTimestampBoundUnsatisfiableError", ) +// errIsRetriable is true if the error is a client-visible retry error +// or the error is a special error that is handled internally and retried. func errIsRetriable(err error) bool { - return errors.HasType(err, (*kvpb.TransactionRetryWithProtoRefreshError)(nil)) || - scerrors.ConcurrentSchemaChangeDescID(err) != descpb.InvalidID || + return errors.HasInterface(err, (*pgerror.ClientVisibleRetryError)(nil)) || errors.Is(err, retriableMinTimestampBoundUnsatisfiableError) || + // Note that this error is not handled internally and can make it to the + // client in implicit transactions. This is not great; it should + // be marked as a client visible retry error. errors.Is(err, descidgen.ErrDescIDSequenceMigrationInProgress) || - execinfra.IsDynamicQueryHasNoHomeRegionError(err) || descs.IsTwoVersionInvariantViolationError(err) } @@ -3517,18 +3520,8 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( // Session is considered active when executing a transaction. ex.totalActiveTimeStopWatch.Start() - if !ex.server.cfg.Codec.ForSystemTenant() { - // Update the leased descriptor collection with the current sqlliveness.Session. - // This is required in the multi-tenant environment to update the transaction - // deadline to either the session expiry or the leased descriptor deadline, - // whichever is sooner. We need this to ensure that transactions initiated - // by ephemeral SQL pods in multi-tenant environments are committed before the - // session expires. - session, err := ex.server.cfg.SQLLiveness.Session(ex.Ctx()) - if err != nil { - return advanceInfo{}, err - } - ex.extraTxnState.descCollection.SetSession(session) + if err := ex.maybeSetSQLLivenessSession(); err != nil { + return advanceInfo{}, err } case txnCommit: if res.Err() != nil { @@ -3605,8 +3598,15 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( } fallthrough - case txnRestart, txnRollback: + case txnRollback: ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent) + case txnRestart: + // In addition to resetting the extraTxnState, the restart event may + // also need to reset the sqlliveness.Session. + ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent) + if err := ex.maybeSetSQLLivenessSession(); err != nil { + return advanceInfo{}, err + } default: return advanceInfo{}, errors.AssertionFailedf( "unexpected event: %v", errors.Safe(advInfo.txnEvent)) @@ -3614,6 +3614,24 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( return advInfo, nil } +func (ex *connExecutor) maybeSetSQLLivenessSession() error { + if !ex.server.cfg.Codec.ForSystemTenant() || + ex.server.cfg.TestingKnobs.ForceSQLLivenessSession { + // Update the leased descriptor collection with the current sqlliveness.Session. + // This is required in the multi-tenant environment to update the transaction + // deadline to either the session expiry or the leased descriptor deadline, + // whichever is sooner. We need this to ensure that transactions initiated + // by ephemeral SQL pods in multi-tenant environments are committed before the + // session expires. + session, err := ex.server.cfg.SQLLiveness.Session(ex.Ctx()) + if err != nil { + return err + } + ex.extraTxnState.descCollection.SetSession(session) + } + return nil +} + func (ex *connExecutor) handleWaitingForConcurrentSchemaChanges( ctx context.Context, descID descpb.ID, ) error { diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index 7b8a1404307b..301ae303a785 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -1264,24 +1264,6 @@ CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT); locked(func() { require.True(t, mu.txnDeadline.Less(fs.Expiration())) }) }) - t.Run("expired session leads to clear error", func(t *testing.T) { - // In this test we use an intentionally expired session in the tenant - // and observe that we get a clear error indicating that the session - // was expired. - sessionDuration := -time.Nanosecond - fs := fakeSession{ - ExpTS: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0), - } - defer setClientSessionOverride(&fs)() - txn, err := sqlConn.Begin() - if err != nil { - t.Fatal(err) - } - _, err = txn.ExecContext(ctx, "UPSERT INTO t1.test(k, v) VALUES (1, 'abc')") - require.Regexp(t, `liveness session expired (\S+) before transaction`, err) - require.NoError(t, txn.Rollback()) - }) - t.Run("single_tenant_ignore_session_expiry", func(t *testing.T) { // In this test, we check that the session expiry is ignored in a single-tenant // environment. To verify this, we deliberately set the session duration to be diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index fe4fd1cbed3d..208a9042e8c3 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1595,6 +1595,10 @@ type ExecutorTestingKnobs struct { // BeforeCopyFromInsert, if set, will be called during a COPY FROM insert statement. BeforeCopyFromInsert func() error + + // ForceSQLLivenessSession will force the use of a sqlliveness session for + // transaction deadlines even in the system tenant. + ForceSQLLivenessSession bool } // PGWireTestingKnobs contains knobs for the pgwire module.