From 9941b2b1922295347e861e1846d6cd4b9f7e4200 Mon Sep 17 00:00:00 2001 From: Ricky Stewart Date: Tue, 28 Mar 2023 11:29:43 -0500 Subject: [PATCH 1/4] ci: don't stress `integration` tests Epic: none Release note: None --- pkg/cmd/github-pull-request-make/main.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/cmd/github-pull-request-make/main.go b/pkg/cmd/github-pull-request-make/main.go index 41d08c6178e7..2204a5dc2f12 100644 --- a/pkg/cmd/github-pull-request-make/main.go +++ b/pkg/cmd/github-pull-request-make/main.go @@ -237,7 +237,11 @@ func main() { // to strip out the unnecessary calls to `bazel`, but that might // better be saved for when we no longer need `make` support and // don't have to worry about accidentally breaking it. - out, err := exec.Command("bazel", "query", fmt.Sprintf("kind(go_test, //%s:all)", name), "--output=label").Output() + out, err := exec.Command( + "bazel", + "query", + fmt.Sprintf("kind(go_test, //%s:all) except attr(tags, \"integration\", //%s:all)", name, name), + "--output=label").Output() if err != nil { log.Fatal(err) } From 3801d9e7f06556feeecc5a6ee792c75404079a7e Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 27 Mar 2023 19:52:46 -0700 Subject: [PATCH 2/4] flowinfra: fix possible use of 'flow' span after Finish This commit fixes a recently introduced bug where we could use the already finished tracing span of the flow. In particular, this could occur since we now unconditionally call `ctxCancel` when connecting inbound streams if that connection fails for any reason. If that failure occurs after the flow has already been cleaned up, then its span would be finished, so further accesses are disallowed. This is now fixed by adjusting the existing `Flow.Cancel` method to consider whether the cleanup has already been performed. This required hiding the flow's status behind the mutex but it should have a negligible synchronization overhead. Release note: None --- pkg/sql/distsql_running.go | 10 +-- pkg/sql/flowinfra/BUILD.bazel | 1 + pkg/sql/flowinfra/flow.go | 83 ++++++++++++++++++------- pkg/sql/flowinfra/flow_registry.go | 4 +- pkg/sql/flowinfra/flow_registry_test.go | 11 ++-- 5 files changed, 73 insertions(+), 36 deletions(-) diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index a899ef84f888..7d83572f2076 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -576,11 +576,9 @@ func (dsp *DistSQLPlanner) setupFlows( // and we do so in a separate goroutine. // // We need to synchronize the new goroutine with flow.Cleanup() being called - // for two reasons: - // - flow.Cleanup() is the last thing before DistSQLPlanner.Run returns at - // which point the rowResultWriter is no longer protected by the mutex of - // the DistSQLReceiver - // - flow.Cancel can only be called before flow.Cleanup. + // since flow.Cleanup() is the last thing before DistSQLPlanner.Run returns + // at which point the rowResultWriter is no longer protected by the mutex of + // the DistSQLReceiver. cleanupCalledMu := struct { syncutil.Mutex called bool @@ -608,8 +606,6 @@ func (dsp *DistSQLPlanner) setupFlows( seenError = true func() { cleanupCalledMu.Lock() - // Flow.Cancel cannot be called after or concurrently with - // Flow.Cleanup. defer cleanupCalledMu.Unlock() if cleanupCalledMu.called { // Cleanup of the local flow has already been performed, diff --git a/pkg/sql/flowinfra/BUILD.bazel b/pkg/sql/flowinfra/BUILD.bazel index ef44773a2e7c..e28b4e3721dc 100644 --- a/pkg/sql/flowinfra/BUILD.bazel +++ b/pkg/sql/flowinfra/BUILD.bazel @@ -36,6 +36,7 @@ go_library( "//pkg/sql/types", "//pkg/util/admission", "//pkg/util/admission/admissionpb", + "//pkg/util/buildutil", "//pkg/util/cancelchecker", "//pkg/util/contextutil", "//pkg/util/log", diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index 6a14ccda81d0..c8944fdad9ab 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -24,10 +24,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/optional" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -151,8 +153,7 @@ type Flow interface { MemUsage() int64 // Cancel cancels the flow by canceling its context. Safe to be called from - // any goroutine but **cannot** be called after (or concurrently with) - // Cleanup. + // any goroutine. Cancel() // AddOnCleanupStart adds a callback to be executed at the very beginning of @@ -232,12 +233,19 @@ type FlowBase struct { statementSQL string - status flowStatus + mu struct { + syncutil.Mutex + status flowStatus + // Cancel function for ctx. Call this to cancel the flow (safe to be + // called multiple times). + // + // NB: must be used with care as this function should **not** be called + // once the Flow has been cleaned up. Consider using Flow.Cancel + // instead when unsure. + ctxCancel context.CancelFunc + } - // Cancel function for ctx. Call this to cancel the flow (safe to be called - // multiple times). - ctxCancel context.CancelFunc - ctxDone <-chan struct{} + ctxDone <-chan struct{} // sp is the span that this Flow runs in. Can be nil if no span was created // for the flow. Flow.Cleanup() finishes it. @@ -249,11 +257,23 @@ type FlowBase struct { admissionInfo admission.WorkInfo } +func (f *FlowBase) getStatus() flowStatus { + f.mu.Lock() + defer f.mu.Unlock() + return f.mu.status +} + +func (f *FlowBase) setStatus(status flowStatus) { + f.mu.Lock() + defer f.mu.Unlock() + f.mu.status = status +} + // Setup is part of the Flow interface. func (f *FlowBase) Setup( ctx context.Context, spec *execinfrapb.FlowSpec, _ FuseOpt, ) (context.Context, execopnode.OpChains, error) { - ctx, f.ctxCancel = contextutil.WithCancel(ctx) + ctx, f.mu.ctxCancel = contextutil.WithCancel(ctx) f.ctxDone = ctx.Done() f.spec = spec return ctx, nil, nil @@ -288,7 +308,7 @@ func (f *FlowBase) SetStartedGoroutines(val bool) { // Started returns true if f has either been Run() or Start()ed. func (f *FlowBase) Started() bool { - return f.status != flowNotStarted + return f.getStatus() != flowNotStarted } var _ Flow = &FlowBase{} @@ -331,7 +351,6 @@ func NewFlowBase( localVectorSources: localVectorSources, admissionInfo: admissionInfo, onCleanupEnd: onFlowCleanupEnd, - status: flowNotStarted, statementSQL: statementSQL, } } @@ -380,9 +399,10 @@ func (f *FlowBase) GetCtxDone() <-chan struct{} { } // GetCancelFlowFn returns the context cancellation function of the context of -// this flow. +// this flow. The returned function is only safe to be used before Flow.Cleanup +// has been called. func (f *FlowBase) GetCancelFlowFn() context.CancelFunc { - return f.ctxCancel + return f.mu.ctxCancel } // SetProcessorsAndOutputs overrides the current f.processors and f.outputs with @@ -466,7 +486,7 @@ func (f *FlowBase) StartInternal( } } - f.status = flowRunning + f.setStatus(flowRunning) if multitenant.TenantRUEstimateEnabled.Get(&f.Cfg.Settings.SV) && !f.Gateway && f.CollectStats { @@ -479,7 +499,10 @@ func (f *FlowBase) StartInternal( log.Infof(ctx, "registered flow %s", f.ID.Short()) } for _, s := range f.startables { - s.Start(ctx, &f.waitGroup, f.ctxCancel) + // Note that it is safe to pass the context cancellation function + // directly since the main goroutine of the Flow will block until all + // startable goroutines exit. + s.Start(ctx, &f.waitGroup, f.mu.ctxCancel) } for i := 0; i < len(processors); i++ { f.waitGroup.Add(1) @@ -561,9 +584,13 @@ func (f *FlowBase) Wait() { var panicVal interface{} if panicVal = recover(); panicVal != nil { // If Wait is called as part of stack unwinding during a panic, the flow - // context must be canceled to ensure that all asynchronous goroutines get - // the message that they must exit (otherwise we will wait indefinitely). - f.ctxCancel() + // context must be canceled to ensure that all asynchronous goroutines + // get the message that they must exit (otherwise we will wait + // indefinitely). + // + // Cleanup is only called _after_ Wait, so it's safe to use ctxCancel + // directly. + f.mu.ctxCancel() } waitChan := make(chan struct{}) @@ -593,7 +620,13 @@ func (f *FlowBase) MemUsage() int64 { // Cancel is part of the Flow interface. func (f *FlowBase) Cancel() { - f.ctxCancel() + f.mu.Lock() + defer f.mu.Unlock() + if f.mu.status == flowFinished { + // The Flow is already done, nothing to cancel. + return + } + f.mu.ctxCancel() } // AddOnCleanupStart is part of the Flow interface. @@ -624,11 +657,13 @@ func (f *FlowBase) GetOnCleanupFns() (startCleanup, endCleanup func()) { } // Cleanup is part of the Flow interface. -// NOTE: this implements only the shared clean up logic between row-based and +// NOTE: this implements only the shared cleanup logic between row-based and // vectorized flows. func (f *FlowBase) Cleanup(ctx context.Context) { - if f.status == flowFinished { - panic("flow cleanup called twice") + if buildutil.CrdbTestBuild { + if f.getStatus() == flowFinished { + panic("flow cleanup called twice") + } } // Release any descriptors accessed by this flow. @@ -675,8 +710,10 @@ func (f *FlowBase) Cleanup(ctx context.Context) { if !f.IsLocal() && f.Started() { f.flowRegistry.UnregisterFlow(f.ID) } - f.status = flowFinished - f.ctxCancel() + // Importantly, we must mark the Flow as finished before f.sp is finished in + // the defer above. + f.setStatus(flowFinished) + f.mu.ctxCancel() } // cancel cancels all unconnected streams of this flow. This function is called diff --git a/pkg/sql/flowinfra/flow_registry.go b/pkg/sql/flowinfra/flow_registry.go index f262133a604a..dcfa782987cc 100644 --- a/pkg/sql/flowinfra/flow_registry.go +++ b/pkg/sql/flowinfra/flow_registry.go @@ -462,7 +462,7 @@ func (fr *FlowRegistry) Drain( // f.flow might be nil when ConnectInboundStream() was // called, but the consumer of that inbound stream hasn't // been scheduled yet. - f.flow.ctxCancel() + f.flow.Cancel() } } fr.Unlock() @@ -583,7 +583,7 @@ func (fr *FlowRegistry) ConnectInboundStream( // query execution will fail, so we cancel the flow on this node. If // this node is the gateway, this might actually be required for // proper shutdown of the whole distributed plan. - flow.ctxCancel() + flow.Cancel() } }() diff --git a/pkg/sql/flowinfra/flow_registry_test.go b/pkg/sql/flowinfra/flow_registry_test.go index 4a2ed8670bbb..0468c7afeb51 100644 --- a/pkg/sql/flowinfra/flow_registry_test.go +++ b/pkg/sql/flowinfra/flow_registry_test.go @@ -212,7 +212,8 @@ func TestStreamConnectionTimeout(t *testing.T) { // Register a flow with a very low timeout. After it times out, we'll attempt // to connect a stream, but it'll be too late. id1 := execinfrapb.FlowID{UUID: uuid.MakeV4()} - f1 := &FlowBase{ctxCancel: func() {}} + f1 := &FlowBase{} + f1.mu.ctxCancel = func() {} streamID1 := execinfrapb.StreamID(1) consumer := &distsqlutils.RowBuffer{} wg := &sync.WaitGroup{} @@ -370,7 +371,8 @@ func TestFlowRegistryDrain(t *testing.T) { ctx := context.Background() reg := NewFlowRegistry() - flow := &FlowBase{ctxCancel: func() {}} + flow := &FlowBase{} + flow.mu.ctxCancel = func() {} id := execinfrapb.FlowID{UUID: uuid.MakeV4()} registerFlow := func(t *testing.T, id execinfrapb.FlowID) { t.Helper() @@ -706,9 +708,10 @@ func TestErrorOnSlowHandshake(t *testing.T) { flowID := execinfrapb.FlowID{UUID: uuid.MakeV4()} streamID := execinfrapb.StreamID(1) cancelCh := make(chan struct{}) - f := &FlowBase{ctxCancel: func() { + f := &FlowBase{} + f.mu.ctxCancel = func() { cancelCh <- struct{}{} - }} + } serverStream, _ /* clientStream */, cleanup := createDummyStream(t) defer cleanup() From ffb16cab14eaad238872636d7f83c5c3a2c4cd44 Mon Sep 17 00:00:00 2001 From: ajwerner Date: Tue, 28 Mar 2023 09:49:23 -0400 Subject: [PATCH 3/4] sqllivenesstestutils: add FakeSession Release note: None --- pkg/sql/BUILD.bazel | 1 + pkg/sql/conn_executor_test.go | 26 +++++++------- .../sqllivenesstestutils/BUILD.bazel | 5 ++- .../sqllivenesstestutils/fake_session.go | 34 +++++++++++++++++++ 4 files changed, 53 insertions(+), 13 deletions(-) create mode 100644 pkg/sql/sqlliveness/sqllivenesstestutils/fake_session.go diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index dfafaedccefd..ed72f6450f65 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -806,6 +806,7 @@ go_test( "//pkg/sql/sessionphase", "//pkg/sql/sqlinstance", "//pkg/sql/sqlliveness", + "//pkg/sql/sqlliveness/sqllivenesstestutils", "//pkg/sql/sqlstats", "//pkg/sql/sqltestutils", "//pkg/sql/stats", diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index 9464b26341ab..7b8a1404307b 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/pgtest" @@ -1211,12 +1212,15 @@ CREATE DATABASE t1; CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT); `) + type fakeSession = sqllivenesstestutils.FakeSession t.Run("session_expiry_overrides_lease_deadline", func(t *testing.T) { // Deliberately set the sessionDuration to be less than the lease duration // to confirm that the sessionDuration overrides the lease duration while // setting the transaction deadline. sessionDuration := base.DefaultDescriptorLeaseDuration - time.Minute - fs := fakeSession{exp: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0)} + fs := fakeSession{ + ExpTS: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0), + } defer setClientSessionOverride(&fs)() txn, err := sqlConn.Begin() @@ -1239,7 +1243,9 @@ CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT); // to confirm that the lease duration overrides the session duration while // setting the transaction deadline sessionDuration := base.DefaultDescriptorLeaseDuration + time.Minute - fs := fakeSession{exp: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0)} + fs := fakeSession{ + ExpTS: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0), + } defer setClientSessionOverride(&fs)() txn, err := sqlConn.Begin() @@ -1263,7 +1269,9 @@ CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT); // and observe that we get a clear error indicating that the session // was expired. sessionDuration := -time.Nanosecond - fs := fakeSession{exp: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0)} + fs := fakeSession{ + ExpTS: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0), + } defer setClientSessionOverride(&fs)() txn, err := sqlConn.Begin() if err != nil { @@ -1293,7 +1301,9 @@ CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT); } // Inject an already expired session to observe that it has no effect. - fs := &fakeSession{exp: s.Clock().Now().Add(-time.Minute.Nanoseconds(), 0)} + fs := &fakeSession{ + ExpTS: s.Clock().Now().Add(-time.Minute.Nanoseconds(), 0), + } defer setClientSessionOverride(fs)() txn, err := dbConn.Begin() if err != nil { @@ -1855,14 +1865,6 @@ func noopRequestFilter(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Er return nil } -type fakeSession struct{ exp hlc.Timestamp } - -func (f fakeSession) ID() sqlliveness.SessionID { return "foo" } -func (f fakeSession) Expiration() hlc.Timestamp { return f.exp } -func (f fakeSession) Start() hlc.Timestamp { panic("unimplemented") } - -var _ sqlliveness.Session = (*fakeSession)(nil) - func getTxnID(t *testing.T, tx *gosql.Tx) (id string) { t.Helper() sqlutils.MakeSQLRunner(tx).QueryRow(t, ` diff --git a/pkg/sql/sqlliveness/sqllivenesstestutils/BUILD.bazel b/pkg/sql/sqlliveness/sqllivenesstestutils/BUILD.bazel index d60d183bca40..e30a5fb165e0 100644 --- a/pkg/sql/sqlliveness/sqllivenesstestutils/BUILD.bazel +++ b/pkg/sql/sqlliveness/sqllivenesstestutils/BUILD.bazel @@ -3,7 +3,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "sqllivenesstestutils", - srcs = ["alwaysalivesession.go"], + srcs = [ + "alwaysalivesession.go", + "fake_session.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils", visibility = ["//visibility:public"], deps = [ diff --git a/pkg/sql/sqlliveness/sqllivenesstestutils/fake_session.go b/pkg/sql/sqlliveness/sqllivenesstestutils/fake_session.go new file mode 100644 index 000000000000..e084f07e2058 --- /dev/null +++ b/pkg/sql/sqlliveness/sqllivenesstestutils/fake_session.go @@ -0,0 +1,34 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sqllivenesstestutils + +import ( + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// FakeSession is an implementation of sqlliveness.Session for testing. +type FakeSession struct { + SessionID sqlliveness.SessionID + ExpTS hlc.Timestamp + StartTS hlc.Timestamp +} + +var _ sqlliveness.Session = (*FakeSession)(nil) + +// ID returns f.SessionID. +func (f *FakeSession) ID() sqlliveness.SessionID { return f.SessionID } + +// Expiration returns f.ExpTS. +func (f *FakeSession) Expiration() hlc.Timestamp { return f.ExpTS } + +// Start return f.StartTS. +func (f *FakeSession) Start() hlc.Timestamp { return f.StartTS } From d00dacedd5a36a7ae63ae59d85c888af28e1f295 Mon Sep 17 00:00:00 2001 From: ajwerner Date: Tue, 28 Mar 2023 00:46:19 -0400 Subject: [PATCH 4/4] sql: make deadline errors retryable If a transaction gets finds itself with a deadline before its current timestamp, something that can happen when there are schema changes or when the sqlliveness subsystem is unavailable, we ought to retry the transaction. Today, an assertion failure is sent. This is the wrong behavior. In order to achieve the desired goal, we detect the scenario in MaybeUpdateDeadline and we return an error which implements the appropriate interface to be interpreted as a retry error in the pgwire and sql layers. This change does a few other little things: 1) It simplies the logic to check whether an error is retriable so that all errors which pgwire will treat as client-visible retries are treated as retriable internally. In addition, we retain certain additional checks for retriable errors the sql layer handles explicitly. 2) It makes sure to reset the sqlliveness session in the descs.Collection when restarting transactions. Not doing this, on the one hand, was an oversight that meant that if the session turned over internally, then restarts wouldn't see it. However, it's not actually a bug today because only secondary tenants set the session, and they never acquire a new session. The test utilizes sessions rather than descriptors because the testing knob infrastructure is easier to manipulate. Fixes: #96336 Backports will deal with #76727 Release note (bug fix): In rare cases involving overload and schema changes, users could sometimes, transiently, see errors of the form "deadline below read timestamp is nonsensical; txn has would have no chance to commit". These errors carried and internal pgcode and could not be retried. This form of error is now classified as a retriable error and will be retried automatically either by the client or internally. --- pkg/sql/catalog/descs/BUILD.bazel | 6 ++ pkg/sql/catalog/descs/leased_descriptors.go | 41 ++++++--- pkg/sql/catalog/descs/txn_external_test.go | 96 +++++++++++++++++++++ pkg/sql/conn_executor.go | 50 +++++++---- pkg/sql/conn_executor_test.go | 18 ---- pkg/sql/exec_util.go | 4 + pkg/sql/execinfra/BUILD.bazel | 1 + pkg/sql/execinfra/errors.go | 7 +- pkg/sql/schemachanger/scerrors/BUILD.bazel | 1 + pkg/sql/schemachanger/scerrors/errors.go | 3 + 10 files changed, 182 insertions(+), 45 deletions(-) diff --git a/pkg/sql/catalog/descs/BUILD.bazel b/pkg/sql/catalog/descs/BUILD.bazel index 883e5e32f46a..a7e544342dba 100644 --- a/pkg/sql/catalog/descs/BUILD.bazel +++ b/pkg/sql/catalog/descs/BUILD.bazel @@ -117,18 +117,24 @@ go_test( "//pkg/sql/sem/catid", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", + "//pkg/sql/sqlliveness", + "//pkg/sql/sqlliveness/sqllivenesstestutils", "//pkg/sql/tests", "//pkg/sql/types", "//pkg/testutils/datapathutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/mon", "//pkg/util/randutil", + "//pkg/util/syncutil", + "@com_github_cockroachdb_cockroach_go_v2//crdb", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", "@com_github_lib_pq//oid", "@com_github_stretchr_testify//require", diff --git a/pkg/sql/catalog/descs/leased_descriptors.go b/pkg/sql/catalog/descs/leased_descriptors.go index 24c0e22330ad..32933a1084f4 100644 --- a/pkg/sql/catalog/descs/leased_descriptors.go +++ b/pkg/sql/catalog/descs/leased_descriptors.go @@ -12,6 +12,7 @@ package descs import ( "context" + "fmt" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -198,16 +199,7 @@ func (ld *leasedDescriptors) maybeUpdateDeadline( // expiration as the deadline will serve a purpose. var deadline hlc.Timestamp if session != nil { - if expiration, txnTS := session.Expiration(), txn.ReadTimestamp(); txnTS.Less(expiration) { - deadline = expiration - } else { - // If the session has expired relative to this transaction, propagate - // a clear error that that's what is going on. - return errors.Errorf( - "liveness session expired %s before transaction", - txnTS.GoTime().Sub(expiration.GoTime()), - ) - } + deadline = session.Expiration() } if leaseDeadline, ok := ld.getDeadline(); ok && (deadline.IsEmpty() || leaseDeadline.Less(deadline)) { // Set the deadline to the lease deadline if session expiration is empty @@ -216,11 +208,40 @@ func (ld *leasedDescriptors) maybeUpdateDeadline( } // If the deadline has been set, update the transaction deadline. if !deadline.IsEmpty() { + // If the deadline certainly cannot be met, return an error which will + // be retried explicitly. + if txnTs := txn.ReadTimestamp(); deadline.LessEq(txnTs) { + return &deadlineExpiredError{ + txnTS: txnTs, + expiration: deadline, + } + } return txn.UpdateDeadline(ctx, deadline) } return nil } +// deadlineExpiredError is returned when the deadline from either a descriptor +// lease or a sqlliveness session is before the current transaction timestamp. +// The error is a user-visible retry. +type deadlineExpiredError struct { + txnTS, expiration hlc.Timestamp +} + +func (e *deadlineExpiredError) SafeFormatError(p errors.Printer) (next error) { + p.Printf("liveness session expired %v before transaction", + e.txnTS.GoTime().Sub(e.expiration.GoTime())) + return nil +} + +func (e *deadlineExpiredError) ClientVisibleRetryError() {} + +func (e *deadlineExpiredError) Error() string { + return fmt.Sprint(errors.Formattable(e)) +} + +var _ errors.SafeFormatter = (*deadlineExpiredError)(nil) + func (ld *leasedDescriptors) getDeadline() (deadline hlc.Timestamp, haveDeadline bool) { _ = ld.cache.IterateByID(func(descriptor catalog.NameEntry) error { expiration := descriptor.(lease.LeasedDescriptor).Expiration() diff --git a/pkg/sql/catalog/descs/txn_external_test.go b/pkg/sql/catalog/descs/txn_external_test.go index 89b00e29dccd..aff7073b9f8b 100644 --- a/pkg/sql/catalog/descs/txn_external_test.go +++ b/pkg/sql/catalog/descs/txn_external_test.go @@ -12,15 +12,25 @@ package descs_test import ( "context" + gosql "database/sql" + "math/rand" "testing" + "github.com/cockroachdb/cockroach-go/v2/crdb" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "github.com/stretchr/testify/require" ) @@ -69,3 +79,89 @@ func TestTxnWithStepping(t *testing.T) { return nil }, isql.SteppingEnabled())) } + +// TestLivenessSessionExpiredErrorResultsInRestartAtSQLLayer ensures that if +// a transaction sees a deadline which is in the past of the current +// transaction timestamp, it'll propagate a retriable error. This should only +// happen in transient situations. +// +// Note that the sqlliveness session is not the only source of a deadline. In +// fact, it's probably more common that schema changes lead to descriptor +// deadlines kicking in. By default, at time of writing, sqlliveness sessions +// only apply to secondary tenants. The test leverages them because they are +// the easiest way to interact with the deadline at a session level. +func TestLivenessSessionExpiredErrorResultsInRestartAtSQLLayer(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var ( + toFail = struct { + syncutil.Mutex + remaining int + }{} + shouldFail = func() bool { + toFail.Lock() + defer toFail.Unlock() + if toFail.remaining > 0 { + toFail.remaining-- + return true + } + return false + } + checkFailed = func(t *testing.T) { + toFail.Lock() + defer toFail.Unlock() + require.Zero(t, toFail.remaining) + } + setRemaining = func(t *testing.T, n int) { + toFail.Lock() + defer toFail.Unlock() + toFail.remaining = n + } + ) + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLExecutor: &sql.ExecutorTestingKnobs{ + ForceSQLLivenessSession: true, + }, + SQLLivenessKnobs: &sqlliveness.TestingKnobs{ + SessionOverride: func(ctx context.Context) (sqlliveness.Session, error) { + // Only client sessions have the client tag. We control the only + // client session. + tags := logtags.FromContext(ctx) + if _, hasClient := tags.GetTag("client"); hasClient && shouldFail() { + // This fake session is certainly expired. + return &sqllivenesstestutils.FakeSession{ + ExpTS: hlc.Timestamp{WallTime: 1}, + }, nil + } + return nil, nil + }, + }, + }, + }) + defer s.Stopper().Stop(ctx) + + tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY)") + + // Ensure that the internal retry works seamlessly + t.Run("auto-retry", func(t *testing.T) { + setRemaining(t, rand.Intn(20)) + tdb.Exec(t, `SELECT * FROM t`) + checkFailed(t) + }) + t.Run("explicit transaction", func(t *testing.T) { + setRemaining(t, rand.Intn(20)) + require.NoError(t, crdb.ExecuteTx(ctx, sqlDB, nil, func(tx *gosql.Tx) error { + _, err := tx.Exec("SELECT * FROM t") + if err != nil { + return err + } + _, err = tx.Exec(`INSERT INTO t VALUES (1)`) + return err + })) + checkFailed(t) + }) +} diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index ef8a0a0721df..57944392be2a 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -3102,12 +3102,15 @@ var retriableMinTimestampBoundUnsatisfiableError = errors.Newf( "retriable MinTimestampBoundUnsatisfiableError", ) +// errIsRetriable is true if the error is a client-visible retry error +// or the error is a special error that is handled internally and retried. func errIsRetriable(err error) bool { - return errors.HasType(err, (*kvpb.TransactionRetryWithProtoRefreshError)(nil)) || - scerrors.ConcurrentSchemaChangeDescID(err) != descpb.InvalidID || + return errors.HasInterface(err, (*pgerror.ClientVisibleRetryError)(nil)) || errors.Is(err, retriableMinTimestampBoundUnsatisfiableError) || + // Note that this error is not handled internally and can make it to the + // client in implicit transactions. This is not great; it should + // be marked as a client visible retry error. errors.Is(err, descidgen.ErrDescIDSequenceMigrationInProgress) || - execinfra.IsDynamicQueryHasNoHomeRegionError(err) || descs.IsTwoVersionInvariantViolationError(err) } @@ -3517,18 +3520,8 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( // Session is considered active when executing a transaction. ex.totalActiveTimeStopWatch.Start() - if !ex.server.cfg.Codec.ForSystemTenant() { - // Update the leased descriptor collection with the current sqlliveness.Session. - // This is required in the multi-tenant environment to update the transaction - // deadline to either the session expiry or the leased descriptor deadline, - // whichever is sooner. We need this to ensure that transactions initiated - // by ephemeral SQL pods in multi-tenant environments are committed before the - // session expires. - session, err := ex.server.cfg.SQLLiveness.Session(ex.Ctx()) - if err != nil { - return advanceInfo{}, err - } - ex.extraTxnState.descCollection.SetSession(session) + if err := ex.maybeSetSQLLivenessSession(); err != nil { + return advanceInfo{}, err } case txnCommit: if res.Err() != nil { @@ -3605,8 +3598,15 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( } fallthrough - case txnRestart, txnRollback: + case txnRollback: ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent) + case txnRestart: + // In addition to resetting the extraTxnState, the restart event may + // also need to reset the sqlliveness.Session. + ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent) + if err := ex.maybeSetSQLLivenessSession(); err != nil { + return advanceInfo{}, err + } default: return advanceInfo{}, errors.AssertionFailedf( "unexpected event: %v", errors.Safe(advInfo.txnEvent)) @@ -3614,6 +3614,24 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( return advInfo, nil } +func (ex *connExecutor) maybeSetSQLLivenessSession() error { + if !ex.server.cfg.Codec.ForSystemTenant() || + ex.server.cfg.TestingKnobs.ForceSQLLivenessSession { + // Update the leased descriptor collection with the current sqlliveness.Session. + // This is required in the multi-tenant environment to update the transaction + // deadline to either the session expiry or the leased descriptor deadline, + // whichever is sooner. We need this to ensure that transactions initiated + // by ephemeral SQL pods in multi-tenant environments are committed before the + // session expires. + session, err := ex.server.cfg.SQLLiveness.Session(ex.Ctx()) + if err != nil { + return err + } + ex.extraTxnState.descCollection.SetSession(session) + } + return nil +} + func (ex *connExecutor) handleWaitingForConcurrentSchemaChanges( ctx context.Context, descID descpb.ID, ) error { diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index 7b8a1404307b..301ae303a785 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -1264,24 +1264,6 @@ CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT); locked(func() { require.True(t, mu.txnDeadline.Less(fs.Expiration())) }) }) - t.Run("expired session leads to clear error", func(t *testing.T) { - // In this test we use an intentionally expired session in the tenant - // and observe that we get a clear error indicating that the session - // was expired. - sessionDuration := -time.Nanosecond - fs := fakeSession{ - ExpTS: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0), - } - defer setClientSessionOverride(&fs)() - txn, err := sqlConn.Begin() - if err != nil { - t.Fatal(err) - } - _, err = txn.ExecContext(ctx, "UPSERT INTO t1.test(k, v) VALUES (1, 'abc')") - require.Regexp(t, `liveness session expired (\S+) before transaction`, err) - require.NoError(t, txn.Rollback()) - }) - t.Run("single_tenant_ignore_session_expiry", func(t *testing.T) { // In this test, we check that the session expiry is ignored in a single-tenant // environment. To verify this, we deliberately set the session duration to be diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index fe4fd1cbed3d..208a9042e8c3 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1595,6 +1595,10 @@ type ExecutorTestingKnobs struct { // BeforeCopyFromInsert, if set, will be called during a COPY FROM insert statement. BeforeCopyFromInsert func() error + + // ForceSQLLivenessSession will force the use of a sqlliveness session for + // transaction deadlines even in the system tenant. + ForceSQLLivenessSession bool } // PGWireTestingKnobs contains knobs for the pgwire module. diff --git a/pkg/sql/execinfra/BUILD.bazel b/pkg/sql/execinfra/BUILD.bazel index 6e318108c23e..1771517d12bc 100644 --- a/pkg/sql/execinfra/BUILD.bazel +++ b/pkg/sql/execinfra/BUILD.bazel @@ -51,6 +51,7 @@ go_library( "//pkg/sql/catalog/typedesc", "//pkg/sql/evalcatalog", "//pkg/sql/execinfrapb", + "//pkg/sql/pgwire/pgerror", "//pkg/sql/rowenc", "//pkg/sql/rowenc/valueside", "//pkg/sql/rowinfra", diff --git a/pkg/sql/execinfra/errors.go b/pkg/sql/execinfra/errors.go index 022c59f2a598..8c6e7326f6c6 100644 --- a/pkg/sql/execinfra/errors.go +++ b/pkg/sql/execinfra/errors.go @@ -10,7 +10,10 @@ package execinfra -import "github.com/cockroachdb/errors" +import ( + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/errors" +) // QueryNotRunningInHomeRegionMessagePrefix is the common message prefix for // erroring out queries with no home region when the enforce_home_region session @@ -29,6 +32,8 @@ func NewDynamicQueryHasNoHomeRegionError(err error) error { return &dynamicQueryHasNoHomeRegionError{err: err} } +var _ pgerror.ClientVisibleRetryError = (*dynamicQueryHasNoHomeRegionError)(nil) + // dynamicQueryHasNoHomeRegionError implements the error interface. func (e *dynamicQueryHasNoHomeRegionError) Error() string { return e.err.Error() diff --git a/pkg/sql/schemachanger/scerrors/BUILD.bazel b/pkg/sql/schemachanger/scerrors/BUILD.bazel index 5b62de479367..6eaaea5e5657 100644 --- a/pkg/sql/schemachanger/scerrors/BUILD.bazel +++ b/pkg/sql/schemachanger/scerrors/BUILD.bazel @@ -9,6 +9,7 @@ go_library( deps = [ "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", + "//pkg/sql/pgwire/pgerror", "//pkg/sql/sem/tree", "//pkg/util/log", "//pkg/util/timeutil", diff --git a/pkg/sql/schemachanger/scerrors/errors.go b/pkg/sql/schemachanger/scerrors/errors.go index 3c70ae965646..f40bac8b6519 100644 --- a/pkg/sql/schemachanger/scerrors/errors.go +++ b/pkg/sql/schemachanger/scerrors/errors.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -130,6 +131,8 @@ type concurrentSchemaChangeError struct { descID descpb.ID } +var _ pgerror.ClientVisibleRetryError = (*concurrentSchemaChangeError)(nil) + // ClientVisibleRetryError is detected by the pgwire layer and will convert // this error into a serialization error to be retried. See // pgcode.ClientVisibleRetryError.