Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sql: make deadline errors retryable #99760

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,7 @@ go_test(
"//pkg/sql/sessionphase",
"//pkg/sql/sqlinstance",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/sqllivenesstestutils",
"//pkg/sql/sqlstats",
"//pkg/sql/sqltestutils",
"//pkg/sql/stats",
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/catalog/descs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,24 @@ go_test(
"//pkg/sql/sem/catid",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/sqllivenesstestutils",
"//pkg/sql/tests",
"//pkg/sql/types",
"//pkg/testutils/datapathutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_lib_pq//oid",
"@com_github_stretchr_testify//require",
Expand Down
41 changes: 31 additions & 10 deletions pkg/sql/catalog/descs/leased_descriptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package descs

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -198,16 +199,7 @@ func (ld *leasedDescriptors) maybeUpdateDeadline(
// expiration as the deadline will serve a purpose.
var deadline hlc.Timestamp
if session != nil {
if expiration, txnTS := session.Expiration(), txn.ReadTimestamp(); txnTS.Less(expiration) {
deadline = expiration
} else {
// If the session has expired relative to this transaction, propagate
// a clear error that that's what is going on.
return errors.Errorf(
"liveness session expired %s before transaction",
txnTS.GoTime().Sub(expiration.GoTime()),
)
}
deadline = session.Expiration()
}
if leaseDeadline, ok := ld.getDeadline(); ok && (deadline.IsEmpty() || leaseDeadline.Less(deadline)) {
// Set the deadline to the lease deadline if session expiration is empty
Expand All @@ -216,11 +208,40 @@ func (ld *leasedDescriptors) maybeUpdateDeadline(
}
// If the deadline has been set, update the transaction deadline.
if !deadline.IsEmpty() {
// If the deadline certainly cannot be met, return an error which will
// be retried explicitly.
if txnTs := txn.ReadTimestamp(); deadline.LessEq(txnTs) {
return &deadlineExpiredError{
txnTS: txnTs,
expiration: deadline,
}
}
return txn.UpdateDeadline(ctx, deadline)
}
return nil
}

// deadlineExpiredError is returned when the deadline from either a descriptor
// lease or a sqlliveness session is before the current transaction timestamp.
// The error is a user-visible retry.
type deadlineExpiredError struct {
txnTS, expiration hlc.Timestamp
}

func (e *deadlineExpiredError) SafeFormatError(p errors.Printer) (next error) {
p.Printf("liveness session expired %v before transaction",
e.txnTS.GoTime().Sub(e.expiration.GoTime()))
return nil
}

func (e *deadlineExpiredError) ClientVisibleRetryError() {}

func (e *deadlineExpiredError) Error() string {
return fmt.Sprint(errors.Formattable(e))
}

var _ errors.SafeFormatter = (*deadlineExpiredError)(nil)

func (ld *leasedDescriptors) getDeadline() (deadline hlc.Timestamp, haveDeadline bool) {
_ = ld.cache.IterateByID(func(descriptor catalog.NameEntry) error {
expiration := descriptor.(lease.LeasedDescriptor).Expiration()
Expand Down
96 changes: 96 additions & 0 deletions pkg/sql/catalog/descs/txn_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,25 @@ package descs_test

import (
"context"
gosql "database/sql"
"math/rand"
"testing"

"github.com/cockroachdb/cockroach-go/v2/crdb"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -69,3 +79,89 @@ func TestTxnWithStepping(t *testing.T) {
return nil
}, isql.SteppingEnabled()))
}

// TestLivenessSessionExpiredErrorResultsInRestartAtSQLLayer ensures that if
// a transaction sees a deadline which is in the past of the current
// transaction timestamp, it'll propagate a retriable error. This should only
// happen in transient situations.
//
// Note that the sqlliveness session is not the only source of a deadline. In
// fact, it's probably more common that schema changes lead to descriptor
// deadlines kicking in. By default, at time of writing, sqlliveness sessions
// only apply to secondary tenants. The test leverages them because they are
// the easiest way to interact with the deadline at a session level.
func TestLivenessSessionExpiredErrorResultsInRestartAtSQLLayer(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
var (
toFail = struct {
syncutil.Mutex
remaining int
}{}
shouldFail = func() bool {
toFail.Lock()
defer toFail.Unlock()
if toFail.remaining > 0 {
toFail.remaining--
return true
}
return false
}
checkFailed = func(t *testing.T) {
toFail.Lock()
defer toFail.Unlock()
require.Zero(t, toFail.remaining)
}
setRemaining = func(t *testing.T, n int) {
toFail.Lock()
defer toFail.Unlock()
toFail.remaining = n
}
)
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
ForceSQLLivenessSession: true,
},
SQLLivenessKnobs: &sqlliveness.TestingKnobs{
SessionOverride: func(ctx context.Context) (sqlliveness.Session, error) {
// Only client sessions have the client tag. We control the only
// client session.
tags := logtags.FromContext(ctx)
if _, hasClient := tags.GetTag("client"); hasClient && shouldFail() {
// This fake session is certainly expired.
return &sqllivenesstestutils.FakeSession{
ExpTS: hlc.Timestamp{WallTime: 1},
}, nil
}
return nil, nil
},
},
},
})
defer s.Stopper().Stop(ctx)

tdb := sqlutils.MakeSQLRunner(sqlDB)
tdb.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY)")

// Ensure that the internal retry works seamlessly
t.Run("auto-retry", func(t *testing.T) {
setRemaining(t, rand.Intn(20))
tdb.Exec(t, `SELECT * FROM t`)
checkFailed(t)
})
t.Run("explicit transaction", func(t *testing.T) {
setRemaining(t, rand.Intn(20))
require.NoError(t, crdb.ExecuteTx(ctx, sqlDB, nil, func(tx *gosql.Tx) error {
_, err := tx.Exec("SELECT * FROM t")
if err != nil {
return err
}
_, err = tx.Exec(`INSERT INTO t VALUES (1)`)
return err
}))
checkFailed(t)
})
}
50 changes: 34 additions & 16 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3102,12 +3102,15 @@ var retriableMinTimestampBoundUnsatisfiableError = errors.Newf(
"retriable MinTimestampBoundUnsatisfiableError",
)

// errIsRetriable is true if the error is a client-visible retry error
// or the error is a special error that is handled internally and retried.
func errIsRetriable(err error) bool {
return errors.HasType(err, (*kvpb.TransactionRetryWithProtoRefreshError)(nil)) ||
scerrors.ConcurrentSchemaChangeDescID(err) != descpb.InvalidID ||
return errors.HasInterface(err, (*pgerror.ClientVisibleRetryError)(nil)) ||
errors.Is(err, retriableMinTimestampBoundUnsatisfiableError) ||
// Note that this error is not handled internally and can make it to the
// client in implicit transactions. This is not great; it should
// be marked as a client visible retry error.
errors.Is(err, descidgen.ErrDescIDSequenceMigrationInProgress) ||
execinfra.IsDynamicQueryHasNoHomeRegionError(err) ||
descs.IsTwoVersionInvariantViolationError(err)
}

Expand Down Expand Up @@ -3517,18 +3520,8 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
// Session is considered active when executing a transaction.
ex.totalActiveTimeStopWatch.Start()

if !ex.server.cfg.Codec.ForSystemTenant() {
// Update the leased descriptor collection with the current sqlliveness.Session.
// This is required in the multi-tenant environment to update the transaction
// deadline to either the session expiry or the leased descriptor deadline,
// whichever is sooner. We need this to ensure that transactions initiated
// by ephemeral SQL pods in multi-tenant environments are committed before the
// session expires.
session, err := ex.server.cfg.SQLLiveness.Session(ex.Ctx())
if err != nil {
return advanceInfo{}, err
}
ex.extraTxnState.descCollection.SetSession(session)
if err := ex.maybeSetSQLLivenessSession(); err != nil {
return advanceInfo{}, err
}
case txnCommit:
if res.Err() != nil {
Expand Down Expand Up @@ -3605,15 +3598,40 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
}

fallthrough
case txnRestart, txnRollback:
case txnRollback:
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent)
case txnRestart:
// In addition to resetting the extraTxnState, the restart event may
// also need to reset the sqlliveness.Session.
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent)
if err := ex.maybeSetSQLLivenessSession(); err != nil {
return advanceInfo{}, err
}
default:
return advanceInfo{}, errors.AssertionFailedf(
"unexpected event: %v", errors.Safe(advInfo.txnEvent))
}
return advInfo, nil
}

func (ex *connExecutor) maybeSetSQLLivenessSession() error {
if !ex.server.cfg.Codec.ForSystemTenant() ||
ex.server.cfg.TestingKnobs.ForceSQLLivenessSession {
// Update the leased descriptor collection with the current sqlliveness.Session.
// This is required in the multi-tenant environment to update the transaction
// deadline to either the session expiry or the leased descriptor deadline,
// whichever is sooner. We need this to ensure that transactions initiated
// by ephemeral SQL pods in multi-tenant environments are committed before the
// session expires.
session, err := ex.server.cfg.SQLLiveness.Session(ex.Ctx())
if err != nil {
return err
}
ex.extraTxnState.descCollection.SetSession(session)
}
return nil
}

func (ex *connExecutor) handleWaitingForConcurrentSchemaChanges(
ctx context.Context, descID descpb.ID,
) error {
Expand Down
38 changes: 11 additions & 27 deletions pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/pgtest"
Expand Down Expand Up @@ -1211,12 +1212,15 @@ CREATE DATABASE t1;
CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT);
`)

type fakeSession = sqllivenesstestutils.FakeSession
t.Run("session_expiry_overrides_lease_deadline", func(t *testing.T) {
// Deliberately set the sessionDuration to be less than the lease duration
// to confirm that the sessionDuration overrides the lease duration while
// setting the transaction deadline.
sessionDuration := base.DefaultDescriptorLeaseDuration - time.Minute
fs := fakeSession{exp: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0)}
fs := fakeSession{
ExpTS: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0),
}
defer setClientSessionOverride(&fs)()

txn, err := sqlConn.Begin()
Expand All @@ -1239,7 +1243,9 @@ CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT);
// to confirm that the lease duration overrides the session duration while
// setting the transaction deadline
sessionDuration := base.DefaultDescriptorLeaseDuration + time.Minute
fs := fakeSession{exp: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0)}
fs := fakeSession{
ExpTS: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0),
}
defer setClientSessionOverride(&fs)()

txn, err := sqlConn.Begin()
Expand All @@ -1258,22 +1264,6 @@ CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT);
locked(func() { require.True(t, mu.txnDeadline.Less(fs.Expiration())) })
})

t.Run("expired session leads to clear error", func(t *testing.T) {
// In this test we use an intentionally expired session in the tenant
// and observe that we get a clear error indicating that the session
// was expired.
sessionDuration := -time.Nanosecond
fs := fakeSession{exp: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0)}
defer setClientSessionOverride(&fs)()
txn, err := sqlConn.Begin()
if err != nil {
t.Fatal(err)
}
_, err = txn.ExecContext(ctx, "UPSERT INTO t1.test(k, v) VALUES (1, 'abc')")
require.Regexp(t, `liveness session expired (\S+) before transaction`, err)
require.NoError(t, txn.Rollback())
})

t.Run("single_tenant_ignore_session_expiry", func(t *testing.T) {
// In this test, we check that the session expiry is ignored in a single-tenant
// environment. To verify this, we deliberately set the session duration to be
Expand All @@ -1293,7 +1283,9 @@ CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT);
}

// Inject an already expired session to observe that it has no effect.
fs := &fakeSession{exp: s.Clock().Now().Add(-time.Minute.Nanoseconds(), 0)}
fs := &fakeSession{
ExpTS: s.Clock().Now().Add(-time.Minute.Nanoseconds(), 0),
}
defer setClientSessionOverride(fs)()
txn, err := dbConn.Begin()
if err != nil {
Expand Down Expand Up @@ -1855,14 +1847,6 @@ func noopRequestFilter(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Er
return nil
}

type fakeSession struct{ exp hlc.Timestamp }

func (f fakeSession) ID() sqlliveness.SessionID { return "foo" }
func (f fakeSession) Expiration() hlc.Timestamp { return f.exp }
func (f fakeSession) Start() hlc.Timestamp { panic("unimplemented") }

var _ sqlliveness.Session = (*fakeSession)(nil)

func getTxnID(t *testing.T, tx *gosql.Tx) (id string) {
t.Helper()
sqlutils.MakeSQLRunner(tx).QueryRow(t, `
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1595,6 +1595,10 @@ type ExecutorTestingKnobs struct {

// BeforeCopyFromInsert, if set, will be called during a COPY FROM insert statement.
BeforeCopyFromInsert func() error

// ForceSQLLivenessSession will force the use of a sqlliveness session for
// transaction deadlines even in the system tenant.
ForceSQLLivenessSession bool
}

// PGWireTestingKnobs contains knobs for the pgwire module.
Expand Down
Loading