Skip to content

Commit

Permalink
sql: make deadline errors retryable
Browse files Browse the repository at this point in the history
If a transaction gets finds itself with a deadline before its current
timestamp, something that can happen when there are schema changes or
when the sqlliveness subsystem is unavailable, we ought to retry the
transaction. Today, an assertion failure is sent. This is the wrong
behavior.

In order to achieve the desired goal, we detect the scenario in
MaybeUpdateDeadline and we return an error which implements the
appropriate interface to be interpreted as a retry error in the
pgwire and sql layers.

This change does a few other little things:

1) It simplies the logic to check whether an error is retriable so that
   all errors which pgwire will treat as client-visible retries are treated
   as retriable internally. In addition, we retain certain additional checks
   for retriable errors the sql layer handles explicitly.
2) It makes sure to reset the sqlliveness session in the descs.Collection
   when restarting transactions. Not doing this, on the one hand, was an
   oversight that meant that if the session turned over internally, then
   restarts wouldn't see it. However, it's not actually a bug today because
   only secondary tenants set the session, and they never acquire a new
   session.

The test utilizes sessions rather than descriptors because the testing
knob infrastructure is easier to manipulate.

Fixes: #96336

Backports will deal with #76727

Release note (bug fix): In rare cases involving overload and schema changes,
users could sometimes, transiently, see errors of the form "deadline below
read timestamp is nonsensical; txn has would have no chance to commit". These
errors carried and internal pgcode and could not be retried. This form of error
is now classified as a retriable error and will be retried automatically either
by the client or internally.
  • Loading branch information
ajwerner committed Mar 28, 2023
1 parent f8dc0db commit a7ab071
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 44 deletions.
6 changes: 6 additions & 0 deletions pkg/sql/catalog/descs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,24 @@ go_test(
"//pkg/sql/sem/catid",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/sqllivenesstestutils",
"//pkg/sql/tests",
"//pkg/sql/types",
"//pkg/testutils/datapathutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_lib_pq//oid",
"@com_github_stretchr_testify//require",
Expand Down
41 changes: 31 additions & 10 deletions pkg/sql/catalog/descs/leased_descriptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package descs

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -198,16 +199,7 @@ func (ld *leasedDescriptors) maybeUpdateDeadline(
// expiration as the deadline will serve a purpose.
var deadline hlc.Timestamp
if session != nil {
if expiration, txnTS := session.Expiration(), txn.ReadTimestamp(); txnTS.Less(expiration) {
deadline = expiration
} else {
// If the session has expired relative to this transaction, propagate
// a clear error that that's what is going on.
return errors.Errorf(
"liveness session expired %s before transaction",
txnTS.GoTime().Sub(expiration.GoTime()),
)
}
deadline = session.Expiration()
}
if leaseDeadline, ok := ld.getDeadline(); ok && (deadline.IsEmpty() || leaseDeadline.Less(deadline)) {
// Set the deadline to the lease deadline if session expiration is empty
Expand All @@ -216,11 +208,40 @@ func (ld *leasedDescriptors) maybeUpdateDeadline(
}
// If the deadline has been set, update the transaction deadline.
if !deadline.IsEmpty() {
// If the deadline certainly cannot be met, return an error which will
// be retried explicitly.
if txnTs := txn.ReadTimestamp(); deadline.LessEq(txnTs) {
return &deadlineExpiredError{
txnTS: txnTs,
expiration: deadline,
}
}
return txn.UpdateDeadline(ctx, deadline)
}
return nil
}

// deadlineExpiredError is returned when the deadline from either a descriptor
// lease or a sqlliveness session is before the current transaction timestamp.
// The error is a user-visible retry.
type deadlineExpiredError struct {
txnTS, expiration hlc.Timestamp
}

func (e *deadlineExpiredError) SafeFormatError(p errors.Printer) (next error) {
p.Printf("liveness session expired %v before transaction",
e.txnTS.GoTime().Sub(e.expiration.GoTime()))
return nil
}

func (e *deadlineExpiredError) ClientVisibleRetryError() {}

func (e *deadlineExpiredError) Error() string {
return fmt.Sprint(errors.Formattable(e))
}

var _ errors.SafeFormatter = (*deadlineExpiredError)(nil)

func (ld *leasedDescriptors) getDeadline() (deadline hlc.Timestamp, haveDeadline bool) {
_ = ld.cache.IterateByID(func(descriptor catalog.NameEntry) error {
expiration := descriptor.(lease.LeasedDescriptor).Expiration()
Expand Down
96 changes: 96 additions & 0 deletions pkg/sql/catalog/descs/txn_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,25 @@ package descs_test

import (
"context"
gosql "database/sql"
"math/rand"
"testing"

"github.com/cockroachdb/cockroach-go/v2/crdb"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -69,3 +79,89 @@ func TestTxnWithStepping(t *testing.T) {
return nil
}, isql.SteppingEnabled()))
}

// TestLivenessSessionExpiredErrorResultsInRestartAtSQLLayer ensures that if
// a transaction sees a deadline which is in the past of the current
// transaction timestamp, it'll propagate a retriable error. This should only
// happen in transient situations.
//
// Note that the sqlliveness session is not the only source of a deadline. In
// fact, it's probably more common that schema changes lead to descriptor
// deadlines kicking in. By default, at time of writing, sqlliveness sessions
// only apply to secondary tenants. The test leverages them because they are
// the easiest way to interact with the deadline at a session level.
func TestLivenessSessionExpiredErrorResultsInRestartAtSQLLayer(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
var (
toFail = struct {
syncutil.Mutex
remaining int
}{}
shouldFail = func() bool {
toFail.Lock()
defer toFail.Unlock()
if toFail.remaining > 0 {
toFail.remaining--
return true
}
return false
}
checkFailed = func(t *testing.T) {
toFail.Lock()
defer toFail.Unlock()
require.Zero(t, toFail.remaining)
}
setRemaining = func(t *testing.T, n int) {
toFail.Lock()
defer toFail.Unlock()
toFail.remaining = n
}
)
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
ForceSQLLivenessSession: true,
},
SQLLivenessKnobs: &sqlliveness.TestingKnobs{
SessionOverride: func(ctx context.Context) (sqlliveness.Session, error) {
// Only client sessions have the client tag. We control the only
// client session.
tags := logtags.FromContext(ctx)
if _, hasClient := tags.GetTag("client"); hasClient && shouldFail() {
// This fake session is certainly expired.
return &sqllivenesstestutils.FakeSession{
ExpTS: hlc.Timestamp{WallTime: 1},
}, nil
}
return nil, nil
},
},
},
})
defer s.Stopper().Stop(ctx)

tdb := sqlutils.MakeSQLRunner(sqlDB)
tdb.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY)")

// Ensure that the internal retry works seamlessly
t.Run("auto-retry", func(t *testing.T) {
setRemaining(t, rand.Intn(20))
tdb.Exec(t, `SELECT * FROM t`)
checkFailed(t)
})
t.Run("explicit transaction", func(t *testing.T) {
setRemaining(t, rand.Intn(20))
require.NoError(t, crdb.ExecuteTx(ctx, sqlDB, nil, func(tx *gosql.Tx) error {
_, err := tx.Exec("SELECT * FROM t")
if err != nil {
return err
}
_, err = tx.Exec(`INSERT INTO t VALUES (1)`)
return err
}))
checkFailed(t)
})
}
50 changes: 34 additions & 16 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3102,12 +3102,15 @@ var retriableMinTimestampBoundUnsatisfiableError = errors.Newf(
"retriable MinTimestampBoundUnsatisfiableError",
)

// errIsRetriable is true if the error is a client-visible retry error
// or the error is a special error that is handled internally and retried.
func errIsRetriable(err error) bool {
return errors.HasType(err, (*kvpb.TransactionRetryWithProtoRefreshError)(nil)) ||
scerrors.ConcurrentSchemaChangeDescID(err) != descpb.InvalidID ||
return errors.HasInterface(err, (*pgerror.ClientVisibleRetryError)(nil)) ||
errors.Is(err, retriableMinTimestampBoundUnsatisfiableError) ||
// Note that this error is not handled internally and can make it to the
// client in implicit transactions. This is not great; it should
// be marked as a client visible retry error.
errors.Is(err, descidgen.ErrDescIDSequenceMigrationInProgress) ||
execinfra.IsDynamicQueryHasNoHomeRegionError(err) ||
descs.IsTwoVersionInvariantViolationError(err)
}

Expand Down Expand Up @@ -3517,18 +3520,8 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
// Session is considered active when executing a transaction.
ex.totalActiveTimeStopWatch.Start()

if !ex.server.cfg.Codec.ForSystemTenant() {
// Update the leased descriptor collection with the current sqlliveness.Session.
// This is required in the multi-tenant environment to update the transaction
// deadline to either the session expiry or the leased descriptor deadline,
// whichever is sooner. We need this to ensure that transactions initiated
// by ephemeral SQL pods in multi-tenant environments are committed before the
// session expires.
session, err := ex.server.cfg.SQLLiveness.Session(ex.Ctx())
if err != nil {
return advanceInfo{}, err
}
ex.extraTxnState.descCollection.SetSession(session)
if err := ex.maybeSetSQLLivenessSession(); err != nil {
return advanceInfo{}, err
}
case txnCommit:
if res.Err() != nil {
Expand Down Expand Up @@ -3605,15 +3598,40 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
}

fallthrough
case txnRestart, txnRollback:
case txnRollback:
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent)
case txnRestart:
// In addition to resetting the extraTxnState, the restart event may
// also need to reset the sqlliveness.Session.
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent)
if err := ex.maybeSetSQLLivenessSession(); err != nil {
return advanceInfo{}, err
}
default:
return advanceInfo{}, errors.AssertionFailedf(
"unexpected event: %v", errors.Safe(advInfo.txnEvent))
}
return advInfo, nil
}

func (ex *connExecutor) maybeSetSQLLivenessSession() error {
if !ex.server.cfg.Codec.ForSystemTenant() ||
ex.server.cfg.TestingKnobs.ForceSQLLivenessSession {
// Update the leased descriptor collection with the current sqlliveness.Session.
// This is required in the multi-tenant environment to update the transaction
// deadline to either the session expiry or the leased descriptor deadline,
// whichever is sooner. We need this to ensure that transactions initiated
// by ephemeral SQL pods in multi-tenant environments are committed before the
// session expires.
session, err := ex.server.cfg.SQLLiveness.Session(ex.Ctx())
if err != nil {
return err
}
ex.extraTxnState.descCollection.SetSession(session)
}
return nil
}

func (ex *connExecutor) handleWaitingForConcurrentSchemaChanges(
ctx context.Context, descID descpb.ID,
) error {
Expand Down
18 changes: 0 additions & 18 deletions pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1264,24 +1264,6 @@ CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT);
locked(func() { require.True(t, mu.txnDeadline.Less(fs.Expiration())) })
})

t.Run("expired session leads to clear error", func(t *testing.T) {
// In this test we use an intentionally expired session in the tenant
// and observe that we get a clear error indicating that the session
// was expired.
sessionDuration := -time.Nanosecond
fs := fakeSession{
ExpTS: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0),
}
defer setClientSessionOverride(&fs)()
txn, err := sqlConn.Begin()
if err != nil {
t.Fatal(err)
}
_, err = txn.ExecContext(ctx, "UPSERT INTO t1.test(k, v) VALUES (1, 'abc')")
require.Regexp(t, `liveness session expired (\S+) before transaction`, err)
require.NoError(t, txn.Rollback())
})

t.Run("single_tenant_ignore_session_expiry", func(t *testing.T) {
// In this test, we check that the session expiry is ignored in a single-tenant
// environment. To verify this, we deliberately set the session duration to be
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1595,6 +1595,10 @@ type ExecutorTestingKnobs struct {

// BeforeCopyFromInsert, if set, will be called during a COPY FROM insert statement.
BeforeCopyFromInsert func() error

// ForceSQLLivenessSession will force the use of a sqlliveness session for
// transaction deadlines even in the system tenant.
ForceSQLLivenessSession bool
}

// PGWireTestingKnobs contains knobs for the pgwire module.
Expand Down

0 comments on commit a7ab071

Please sign in to comment.