Skip to content

Commit

Permalink
Merge pull request cockroachdb#100255 from ajwerner/backport22.2-99760
Browse files Browse the repository at this point in the history
  • Loading branch information
ajwerner authored Apr 3, 2023
2 parents 360d268 + 9e74ade commit 9e4daf6
Show file tree
Hide file tree
Showing 11 changed files with 234 additions and 56 deletions.
1 change: 1 addition & 0 deletions pkg/sql/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ go_test(
"//pkg/sql/sessiondatapb",
"//pkg/sql/sessionphase",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/sqllivenesstestutils",
"//pkg/sql/sqlstats",
"//pkg/sql/sqltestutils",
"//pkg/sql/sqlutil",
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/catalog/descs/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -112,19 +112,25 @@ go_test(
"//pkg/sql/sem/catid",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/sqllivenesstestutils",
"//pkg/sql/sqlutil",
"//pkg/sql/tests",
"//pkg/sql/types",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
"@com_github_cockroachdb_redact//:redact",
"@com_github_lib_pq//oid",
"@com_github_stretchr_testify//require",
Expand Down
41 changes: 31 additions & 10 deletions pkg/sql/catalog/descs/leased_descriptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package descs

import (
"context"
"fmt"

"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
Expand Down Expand Up @@ -199,16 +200,7 @@ func (ld *leasedDescriptors) maybeUpdateDeadline(
// SQL pods.
var deadline hlc.Timestamp
if session != nil {
if expiration, txnTS := session.Expiration(), txn.ReadTimestamp(); txnTS.Less(expiration) {
deadline = expiration
} else {
// If the session has expired relative to this transaction, propagate
// a clear error that that's what is going on.
return errors.Errorf(
"liveness session expired %s before transaction",
txnTS.GoTime().Sub(expiration.GoTime()),
)
}
deadline = session.Expiration()
}
if leaseDeadline, ok := ld.getDeadline(); ok && (deadline.IsEmpty() || leaseDeadline.Less(deadline)) {
// Set the deadline to the lease deadline if session expiration is empty
Expand All @@ -217,11 +209,40 @@ func (ld *leasedDescriptors) maybeUpdateDeadline(
}
// If the deadline has been set, update the transaction deadline.
if !deadline.IsEmpty() {
// If the deadline certainly cannot be met, return an error which will
// be retried explicitly.
if txnTs := txn.ReadTimestamp(); deadline.LessEq(txnTs) {
return &deadlineExpiredError{
txnTS: txnTs,
expiration: deadline,
}
}
return txn.UpdateDeadline(ctx, deadline)
}
return nil
}

// deadlineExpiredError is returned when the deadline from either a descriptor
// lease or a sqlliveness session is before the current transaction timestamp.
// The error is a user-visible retry.
type deadlineExpiredError struct {
txnTS, expiration hlc.Timestamp
}

func (e *deadlineExpiredError) SafeFormatError(p errors.Printer) (next error) {
p.Printf("liveness session expired %v before transaction",
e.txnTS.GoTime().Sub(e.expiration.GoTime()))
return nil
}

func (e *deadlineExpiredError) ClientVisibleRetryError() {}

func (e *deadlineExpiredError) Error() string {
return fmt.Sprint(errors.Formattable(e))
}

var _ errors.SafeFormatter = (*deadlineExpiredError)(nil)

func (ld *leasedDescriptors) getDeadline() (deadline hlc.Timestamp, haveDeadline bool) {
_ = ld.cache.IterateByID(func(descriptor catalog.NameEntry) error {
expiration := descriptor.(lease.LeasedDescriptor).Expiration()
Expand Down
102 changes: 102 additions & 0 deletions pkg/sql/catalog/descs/txn_external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,26 @@ package descs_test

import (
"context"
gosql "database/sql"
"math/rand"
"testing"

"github.com/cockroachdb/cockroach-go/v2/crdb"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -70,3 +80,95 @@ func TestTxnWithStepping(t *testing.T) {
return nil
}, sqlutil.SteppingEnabled()))
}

// TestLivenessSessionExpiredErrorResultsInRestartAtSQLLayer ensures that if
// a transaction sees a deadline which is in the past of the current
// transaction timestamp, it'll propagate a retriable error. This should only
// happen in transient situations.
//
// Note that the sqlliveness session is not the only source of a deadline. In
// fact, it's probably more common that schema changes lead to descriptor
// deadlines kicking in. By default, at time of writing, sqlliveness sessions
// only apply to secondary tenants. The test leverages them because they are
// the easiest way to interact with the deadline at a session level.
func TestLivenessSessionExpiredErrorResultsInRestartAtSQLLayer(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
var (
toFail = struct {
syncutil.Mutex
remaining int
}{}
shouldFail = func() bool {
toFail.Lock()
defer toFail.Unlock()
if toFail.remaining > 0 {
toFail.remaining--
return true
}
return false
}
checkFailed = func(t *testing.T) {
toFail.Lock()
defer toFail.Unlock()
require.Zero(t, toFail.remaining)
}
setRemaining = func(t *testing.T, n int) {
toFail.Lock()
defer toFail.Unlock()
toFail.remaining = n
}
)
s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
SQLExecutor: &sql.ExecutorTestingKnobs{
ForceSQLLivenessSession: true,
},
SQLLivenessKnobs: &sqlliveness.TestingKnobs{
SessionOverride: func(ctx context.Context) (sqlliveness.Session, error) {
// Only client sessions have the client tag. We control the only
// client session.
tags := logtags.FromContext(ctx)
var hasClient bool
for _, t := range tags.Get() {
if hasClient = t.Key() == "client"; hasClient {
break
}
}
if hasClient && shouldFail() {
// This fake session is certainly expired.
return &sqllivenesstestutils.FakeSession{
ExpTS: hlc.Timestamp{WallTime: 1},
}, nil
}
return nil, nil
},
},
},
})
defer s.Stopper().Stop(ctx)

tdb := sqlutils.MakeSQLRunner(sqlDB)
tdb.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY)")

// Ensure that the internal retry works seamlessly
t.Run("auto-retry", func(t *testing.T) {
setRemaining(t, rand.Intn(20))
tdb.Exec(t, `SELECT * FROM t`)
checkFailed(t)
})
t.Run("explicit transaction", func(t *testing.T) {
setRemaining(t, rand.Intn(20))
require.NoError(t, crdb.ExecuteTx(ctx, sqlDB, nil, func(tx *gosql.Tx) error {
_, err := tx.Exec("SELECT * FROM t")
if err != nil {
return err
}
_, err = tx.Exec(`INSERT INTO t VALUES (1)`)
return err
}))
checkFailed(t)
})
}
46 changes: 31 additions & 15 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2753,9 +2753,10 @@ var retriableMinTimestampBoundUnsatisfiableError = errors.Newf(
"retriable MinTimestampBoundUnsatisfiableError",
)

// errIsRetriable is true if the error is a client-visible retry error
// or the error is a special error that is handled internally and retried.
func errIsRetriable(err error) bool {
return errors.HasType(err, (*roachpb.TransactionRetryWithProtoRefreshError)(nil)) ||
scerrors.ConcurrentSchemaChangeDescID(err) != descpb.InvalidID ||
return errors.HasInterface(err, (*pgerror.ClientVisibleRetryError)(nil)) ||
errors.Is(err, retriableMinTimestampBoundUnsatisfiableError) ||
descs.IsTwoVersionInvariantViolationError(err)
}
Expand Down Expand Up @@ -3130,18 +3131,8 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
// Session is considered active when executing a transaction.
ex.totalActiveTimeStopWatch.Start()

if !ex.server.cfg.Codec.ForSystemTenant() {
// Update the leased descriptor collection with the current sqlliveness.Session.
// This is required in the multi-tenant environment to update the transaction
// deadline to either the session expiry or the leased descriptor deadline,
// whichever is sooner. We need this to ensure that transactions initiated
// by ephemeral SQL pods in multi-tenant environments are committed before the
// session expires.
session, err := ex.server.cfg.SQLLiveness.Session(ex.Ctx())
if err != nil {
return advanceInfo{}, err
}
ex.extraTxnState.descCollection.SetSession(session)
if err := ex.maybeSetSQLLivenessSession(); err != nil {
return advanceInfo{}, err
}
case txnCommit:
if res.Err() != nil {
Expand Down Expand Up @@ -3206,15 +3197,40 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
ex.statsCollector.PhaseTimes().SetSessionPhaseTime(sessionphase.SessionEndPostCommitJob, timeutil.Now())

fallthrough
case txnRestart, txnRollback:
case txnRollback:
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent)
case txnRestart:
// In addition to resetting the extraTxnState, the restart event may
// also need to reset the sqlliveness.Session.
ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent)
if err := ex.maybeSetSQLLivenessSession(); err != nil {
return advanceInfo{}, err
}
default:
return advanceInfo{}, errors.AssertionFailedf(
"unexpected event: %v", errors.Safe(advInfo.txnEvent))
}
return advInfo, nil
}

func (ex *connExecutor) maybeSetSQLLivenessSession() error {
if !ex.server.cfg.Codec.ForSystemTenant() ||
ex.server.cfg.TestingKnobs.ForceSQLLivenessSession {
// Update the leased descriptor collection with the current sqlliveness.Session.
// This is required in the multi-tenant environment to update the transaction
// deadline to either the session expiry or the leased descriptor deadline,
// whichever is sooner. We need this to ensure that transactions initiated
// by ephemeral SQL pods in multi-tenant environments are committed before the
// session expires.
session, err := ex.server.cfg.SQLLiveness.Session(ex.Ctx())
if err != nil {
return err
}
ex.extraTxnState.descCollection.SetSession(session)
}
return nil
}

func (ex *connExecutor) handleWaitingForConcurrentSchemaChanges(
ctx context.Context, descID descpb.ID,
) error {
Expand Down
41 changes: 11 additions & 30 deletions pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/pgtest"
Expand Down Expand Up @@ -1209,12 +1210,15 @@ CREATE DATABASE t1;
CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT);
`)

type fakeSession = sqllivenesstestutils.FakeSession
t.Run("session_expiry_overrides_lease_deadline", func(t *testing.T) {
// Deliberately set the sessionDuration to be less than the lease duration
// to confirm that the sessionDuration overrides the lease duration while
// setting the transaction deadline.
sessionDuration := base.DefaultDescriptorLeaseDuration - time.Minute
fs := fakeSession{exp: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0)}
fs := fakeSession{
ExpTS: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0),
}
defer setClientSessionOverride(&fs)()

txn, err := sqlConn.Begin()
Expand All @@ -1237,7 +1241,9 @@ CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT);
// to confirm that the lease duration overrides the session duration while
// setting the transaction deadline
sessionDuration := base.DefaultDescriptorLeaseDuration + time.Minute
fs := fakeSession{exp: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0)}
fs := fakeSession{
ExpTS: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0),
}
defer setClientSessionOverride(&fs)()

txn, err := sqlConn.Begin()
Expand All @@ -1256,22 +1262,6 @@ CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT);
locked(func() { require.True(t, mu.txnDeadline.Less(fs.Expiration())) })
})

t.Run("expired session leads to clear error", func(t *testing.T) {
// In this test we use an intentionally expired session in the tenant
// and observe that we get a clear error indicating that the session
// was expired.
sessionDuration := -time.Nanosecond
fs := fakeSession{exp: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0)}
defer setClientSessionOverride(&fs)()
txn, err := sqlConn.Begin()
if err != nil {
t.Fatal(err)
}
_, err = txn.ExecContext(ctx, "UPSERT INTO t1.test(k, v) VALUES (1, 'abc')")
require.Regexp(t, `liveness session expired (\S+) before transaction`, err)
require.NoError(t, txn.Rollback())
})

t.Run("single_tenant_ignore_session_expiry", func(t *testing.T) {
// In this test, we check that the session expiry is ignored in a single-tenant
// environment. To verify this, we deliberately set the session duration to be
Expand All @@ -1291,7 +1281,9 @@ CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT);
}

// Inject an already expired session to observe that it has no effect.
fs := &fakeSession{exp: s.Clock().Now().Add(-time.Minute.Nanoseconds(), 0)}
fs := &fakeSession{
ExpTS: s.Clock().Now().Add(-time.Minute.Nanoseconds(), 0),
}
defer setClientSessionOverride(fs)()
txn, err := dbConn.Begin()
if err != nil {
Expand Down Expand Up @@ -1809,17 +1801,6 @@ func noopRequestFilter(ctx context.Context, request roachpb.BatchRequest) *roach
return nil
}

type fakeSession struct{ exp hlc.Timestamp }

func (f fakeSession) ID() sqlliveness.SessionID { return "foo" }
func (f fakeSession) Expiration() hlc.Timestamp { return f.exp }
func (f fakeSession) Start() hlc.Timestamp { panic("unimplemented") }
func (f fakeSession) RegisterCallbackForSessionExpiry(func(ctx context.Context)) {
panic("unimplemented")
}

var _ sqlliveness.Session = (*fakeSession)(nil)

func getTxnID(t *testing.T, tx *gosql.Tx) (id string) {
t.Helper()
sqlutils.MakeSQLRunner(tx).QueryRow(t, `
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1512,6 +1512,10 @@ type ExecutorTestingKnobs struct {

// BeforeCopyFromInsert, if set, will be called during a COPY FROM insert statement.
BeforeCopyFromInsert func() error

// ForceSQLLivenessSession will force the use of a sqlliveness session for
// transaction deadlines even in the system tenant.
ForceSQLLivenessSession bool
}

// PGWireTestingKnobs contains knobs for the pgwire module.
Expand Down
Loading

0 comments on commit 9e4daf6

Please sign in to comment.