diff --git a/pkg/cmd/github-pull-request-make/main.go b/pkg/cmd/github-pull-request-make/main.go index 41d08c6178e7..2204a5dc2f12 100644 --- a/pkg/cmd/github-pull-request-make/main.go +++ b/pkg/cmd/github-pull-request-make/main.go @@ -237,7 +237,11 @@ func main() { // to strip out the unnecessary calls to `bazel`, but that might // better be saved for when we no longer need `make` support and // don't have to worry about accidentally breaking it. - out, err := exec.Command("bazel", "query", fmt.Sprintf("kind(go_test, //%s:all)", name), "--output=label").Output() + out, err := exec.Command( + "bazel", + "query", + fmt.Sprintf("kind(go_test, //%s:all) except attr(tags, \"integration\", //%s:all)", name, name), + "--output=label").Output() if err != nil { log.Fatal(err) } diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index dfafaedccefd..ed72f6450f65 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -806,6 +806,7 @@ go_test( "//pkg/sql/sessionphase", "//pkg/sql/sqlinstance", "//pkg/sql/sqlliveness", + "//pkg/sql/sqlliveness/sqllivenesstestutils", "//pkg/sql/sqlstats", "//pkg/sql/sqltestutils", "//pkg/sql/stats", diff --git a/pkg/sql/catalog/descs/BUILD.bazel b/pkg/sql/catalog/descs/BUILD.bazel index 883e5e32f46a..a7e544342dba 100644 --- a/pkg/sql/catalog/descs/BUILD.bazel +++ b/pkg/sql/catalog/descs/BUILD.bazel @@ -117,18 +117,24 @@ go_test( "//pkg/sql/sem/catid", "//pkg/sql/sem/tree", "//pkg/sql/sessiondata", + "//pkg/sql/sqlliveness", + "//pkg/sql/sqlliveness/sqllivenesstestutils", "//pkg/sql/tests", "//pkg/sql/types", "//pkg/testutils/datapathutils", "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/mon", "//pkg/util/randutil", + "//pkg/util/syncutil", + "@com_github_cockroachdb_cockroach_go_v2//crdb", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", + "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", "@com_github_lib_pq//oid", "@com_github_stretchr_testify//require", diff --git a/pkg/sql/catalog/descs/leased_descriptors.go b/pkg/sql/catalog/descs/leased_descriptors.go index 24c0e22330ad..32933a1084f4 100644 --- a/pkg/sql/catalog/descs/leased_descriptors.go +++ b/pkg/sql/catalog/descs/leased_descriptors.go @@ -12,6 +12,7 @@ package descs import ( "context" + "fmt" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" @@ -198,16 +199,7 @@ func (ld *leasedDescriptors) maybeUpdateDeadline( // expiration as the deadline will serve a purpose. var deadline hlc.Timestamp if session != nil { - if expiration, txnTS := session.Expiration(), txn.ReadTimestamp(); txnTS.Less(expiration) { - deadline = expiration - } else { - // If the session has expired relative to this transaction, propagate - // a clear error that that's what is going on. - return errors.Errorf( - "liveness session expired %s before transaction", - txnTS.GoTime().Sub(expiration.GoTime()), - ) - } + deadline = session.Expiration() } if leaseDeadline, ok := ld.getDeadline(); ok && (deadline.IsEmpty() || leaseDeadline.Less(deadline)) { // Set the deadline to the lease deadline if session expiration is empty @@ -216,11 +208,40 @@ func (ld *leasedDescriptors) maybeUpdateDeadline( } // If the deadline has been set, update the transaction deadline. if !deadline.IsEmpty() { + // If the deadline certainly cannot be met, return an error which will + // be retried explicitly. + if txnTs := txn.ReadTimestamp(); deadline.LessEq(txnTs) { + return &deadlineExpiredError{ + txnTS: txnTs, + expiration: deadline, + } + } return txn.UpdateDeadline(ctx, deadline) } return nil } +// deadlineExpiredError is returned when the deadline from either a descriptor +// lease or a sqlliveness session is before the current transaction timestamp. +// The error is a user-visible retry. +type deadlineExpiredError struct { + txnTS, expiration hlc.Timestamp +} + +func (e *deadlineExpiredError) SafeFormatError(p errors.Printer) (next error) { + p.Printf("liveness session expired %v before transaction", + e.txnTS.GoTime().Sub(e.expiration.GoTime())) + return nil +} + +func (e *deadlineExpiredError) ClientVisibleRetryError() {} + +func (e *deadlineExpiredError) Error() string { + return fmt.Sprint(errors.Formattable(e)) +} + +var _ errors.SafeFormatter = (*deadlineExpiredError)(nil) + func (ld *leasedDescriptors) getDeadline() (deadline hlc.Timestamp, haveDeadline bool) { _ = ld.cache.IterateByID(func(descriptor catalog.NameEntry) error { expiration := descriptor.(lease.LeasedDescriptor).Expiration() diff --git a/pkg/sql/catalog/descs/txn_external_test.go b/pkg/sql/catalog/descs/txn_external_test.go index 89b00e29dccd..aff7073b9f8b 100644 --- a/pkg/sql/catalog/descs/txn_external_test.go +++ b/pkg/sql/catalog/descs/txn_external_test.go @@ -12,15 +12,25 @@ package descs_test import ( "context" + gosql "database/sql" + "math/rand" "testing" + "github.com/cockroachdb/cockroach-go/v2/crdb" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" "github.com/stretchr/testify/require" ) @@ -69,3 +79,89 @@ func TestTxnWithStepping(t *testing.T) { return nil }, isql.SteppingEnabled())) } + +// TestLivenessSessionExpiredErrorResultsInRestartAtSQLLayer ensures that if +// a transaction sees a deadline which is in the past of the current +// transaction timestamp, it'll propagate a retriable error. This should only +// happen in transient situations. +// +// Note that the sqlliveness session is not the only source of a deadline. In +// fact, it's probably more common that schema changes lead to descriptor +// deadlines kicking in. By default, at time of writing, sqlliveness sessions +// only apply to secondary tenants. The test leverages them because they are +// the easiest way to interact with the deadline at a session level. +func TestLivenessSessionExpiredErrorResultsInRestartAtSQLLayer(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var ( + toFail = struct { + syncutil.Mutex + remaining int + }{} + shouldFail = func() bool { + toFail.Lock() + defer toFail.Unlock() + if toFail.remaining > 0 { + toFail.remaining-- + return true + } + return false + } + checkFailed = func(t *testing.T) { + toFail.Lock() + defer toFail.Unlock() + require.Zero(t, toFail.remaining) + } + setRemaining = func(t *testing.T, n int) { + toFail.Lock() + defer toFail.Unlock() + toFail.remaining = n + } + ) + s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + SQLExecutor: &sql.ExecutorTestingKnobs{ + ForceSQLLivenessSession: true, + }, + SQLLivenessKnobs: &sqlliveness.TestingKnobs{ + SessionOverride: func(ctx context.Context) (sqlliveness.Session, error) { + // Only client sessions have the client tag. We control the only + // client session. + tags := logtags.FromContext(ctx) + if _, hasClient := tags.GetTag("client"); hasClient && shouldFail() { + // This fake session is certainly expired. + return &sqllivenesstestutils.FakeSession{ + ExpTS: hlc.Timestamp{WallTime: 1}, + }, nil + } + return nil, nil + }, + }, + }, + }) + defer s.Stopper().Stop(ctx) + + tdb := sqlutils.MakeSQLRunner(sqlDB) + tdb.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY)") + + // Ensure that the internal retry works seamlessly + t.Run("auto-retry", func(t *testing.T) { + setRemaining(t, rand.Intn(20)) + tdb.Exec(t, `SELECT * FROM t`) + checkFailed(t) + }) + t.Run("explicit transaction", func(t *testing.T) { + setRemaining(t, rand.Intn(20)) + require.NoError(t, crdb.ExecuteTx(ctx, sqlDB, nil, func(tx *gosql.Tx) error { + _, err := tx.Exec("SELECT * FROM t") + if err != nil { + return err + } + _, err = tx.Exec(`INSERT INTO t VALUES (1)`) + return err + })) + checkFailed(t) + }) +} diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index ef8a0a0721df..57944392be2a 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -3102,12 +3102,15 @@ var retriableMinTimestampBoundUnsatisfiableError = errors.Newf( "retriable MinTimestampBoundUnsatisfiableError", ) +// errIsRetriable is true if the error is a client-visible retry error +// or the error is a special error that is handled internally and retried. func errIsRetriable(err error) bool { - return errors.HasType(err, (*kvpb.TransactionRetryWithProtoRefreshError)(nil)) || - scerrors.ConcurrentSchemaChangeDescID(err) != descpb.InvalidID || + return errors.HasInterface(err, (*pgerror.ClientVisibleRetryError)(nil)) || errors.Is(err, retriableMinTimestampBoundUnsatisfiableError) || + // Note that this error is not handled internally and can make it to the + // client in implicit transactions. This is not great; it should + // be marked as a client visible retry error. errors.Is(err, descidgen.ErrDescIDSequenceMigrationInProgress) || - execinfra.IsDynamicQueryHasNoHomeRegionError(err) || descs.IsTwoVersionInvariantViolationError(err) } @@ -3517,18 +3520,8 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( // Session is considered active when executing a transaction. ex.totalActiveTimeStopWatch.Start() - if !ex.server.cfg.Codec.ForSystemTenant() { - // Update the leased descriptor collection with the current sqlliveness.Session. - // This is required in the multi-tenant environment to update the transaction - // deadline to either the session expiry or the leased descriptor deadline, - // whichever is sooner. We need this to ensure that transactions initiated - // by ephemeral SQL pods in multi-tenant environments are committed before the - // session expires. - session, err := ex.server.cfg.SQLLiveness.Session(ex.Ctx()) - if err != nil { - return advanceInfo{}, err - } - ex.extraTxnState.descCollection.SetSession(session) + if err := ex.maybeSetSQLLivenessSession(); err != nil { + return advanceInfo{}, err } case txnCommit: if res.Err() != nil { @@ -3605,8 +3598,15 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( } fallthrough - case txnRestart, txnRollback: + case txnRollback: ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent) + case txnRestart: + // In addition to resetting the extraTxnState, the restart event may + // also need to reset the sqlliveness.Session. + ex.resetExtraTxnState(ex.Ctx(), advInfo.txnEvent) + if err := ex.maybeSetSQLLivenessSession(); err != nil { + return advanceInfo{}, err + } default: return advanceInfo{}, errors.AssertionFailedf( "unexpected event: %v", errors.Safe(advInfo.txnEvent)) @@ -3614,6 +3614,24 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper( return advInfo, nil } +func (ex *connExecutor) maybeSetSQLLivenessSession() error { + if !ex.server.cfg.Codec.ForSystemTenant() || + ex.server.cfg.TestingKnobs.ForceSQLLivenessSession { + // Update the leased descriptor collection with the current sqlliveness.Session. + // This is required in the multi-tenant environment to update the transaction + // deadline to either the session expiry or the leased descriptor deadline, + // whichever is sooner. We need this to ensure that transactions initiated + // by ephemeral SQL pods in multi-tenant environments are committed before the + // session expires. + session, err := ex.server.cfg.SQLLiveness.Session(ex.Ctx()) + if err != nil { + return err + } + ex.extraTxnState.descCollection.SetSession(session) + } + return nil +} + func (ex *connExecutor) handleWaitingForConcurrentSchemaChanges( ctx context.Context, descID descpb.ID, ) error { diff --git a/pkg/sql/conn_executor_test.go b/pkg/sql/conn_executor_test.go index 9464b26341ab..301ae303a785 100644 --- a/pkg/sql/conn_executor_test.go +++ b/pkg/sql/conn_executor_test.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/pgtest" @@ -1211,12 +1212,15 @@ CREATE DATABASE t1; CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT); `) + type fakeSession = sqllivenesstestutils.FakeSession t.Run("session_expiry_overrides_lease_deadline", func(t *testing.T) { // Deliberately set the sessionDuration to be less than the lease duration // to confirm that the sessionDuration overrides the lease duration while // setting the transaction deadline. sessionDuration := base.DefaultDescriptorLeaseDuration - time.Minute - fs := fakeSession{exp: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0)} + fs := fakeSession{ + ExpTS: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0), + } defer setClientSessionOverride(&fs)() txn, err := sqlConn.Begin() @@ -1239,7 +1243,9 @@ CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT); // to confirm that the lease duration overrides the session duration while // setting the transaction deadline sessionDuration := base.DefaultDescriptorLeaseDuration + time.Minute - fs := fakeSession{exp: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0)} + fs := fakeSession{ + ExpTS: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0), + } defer setClientSessionOverride(&fs)() txn, err := sqlConn.Begin() @@ -1258,22 +1264,6 @@ CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT); locked(func() { require.True(t, mu.txnDeadline.Less(fs.Expiration())) }) }) - t.Run("expired session leads to clear error", func(t *testing.T) { - // In this test we use an intentionally expired session in the tenant - // and observe that we get a clear error indicating that the session - // was expired. - sessionDuration := -time.Nanosecond - fs := fakeSession{exp: s.Clock().Now().Add(sessionDuration.Nanoseconds(), 0)} - defer setClientSessionOverride(&fs)() - txn, err := sqlConn.Begin() - if err != nil { - t.Fatal(err) - } - _, err = txn.ExecContext(ctx, "UPSERT INTO t1.test(k, v) VALUES (1, 'abc')") - require.Regexp(t, `liveness session expired (\S+) before transaction`, err) - require.NoError(t, txn.Rollback()) - }) - t.Run("single_tenant_ignore_session_expiry", func(t *testing.T) { // In this test, we check that the session expiry is ignored in a single-tenant // environment. To verify this, we deliberately set the session duration to be @@ -1293,7 +1283,9 @@ CREATE TABLE t1.test (k INT PRIMARY KEY, v TEXT); } // Inject an already expired session to observe that it has no effect. - fs := &fakeSession{exp: s.Clock().Now().Add(-time.Minute.Nanoseconds(), 0)} + fs := &fakeSession{ + ExpTS: s.Clock().Now().Add(-time.Minute.Nanoseconds(), 0), + } defer setClientSessionOverride(fs)() txn, err := dbConn.Begin() if err != nil { @@ -1855,14 +1847,6 @@ func noopRequestFilter(ctx context.Context, request *kvpb.BatchRequest) *kvpb.Er return nil } -type fakeSession struct{ exp hlc.Timestamp } - -func (f fakeSession) ID() sqlliveness.SessionID { return "foo" } -func (f fakeSession) Expiration() hlc.Timestamp { return f.exp } -func (f fakeSession) Start() hlc.Timestamp { panic("unimplemented") } - -var _ sqlliveness.Session = (*fakeSession)(nil) - func getTxnID(t *testing.T, tx *gosql.Tx) (id string) { t.Helper() sqlutils.MakeSQLRunner(tx).QueryRow(t, ` diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index a899ef84f888..7d83572f2076 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -576,11 +576,9 @@ func (dsp *DistSQLPlanner) setupFlows( // and we do so in a separate goroutine. // // We need to synchronize the new goroutine with flow.Cleanup() being called - // for two reasons: - // - flow.Cleanup() is the last thing before DistSQLPlanner.Run returns at - // which point the rowResultWriter is no longer protected by the mutex of - // the DistSQLReceiver - // - flow.Cancel can only be called before flow.Cleanup. + // since flow.Cleanup() is the last thing before DistSQLPlanner.Run returns + // at which point the rowResultWriter is no longer protected by the mutex of + // the DistSQLReceiver. cleanupCalledMu := struct { syncutil.Mutex called bool @@ -608,8 +606,6 @@ func (dsp *DistSQLPlanner) setupFlows( seenError = true func() { cleanupCalledMu.Lock() - // Flow.Cancel cannot be called after or concurrently with - // Flow.Cleanup. defer cleanupCalledMu.Unlock() if cleanupCalledMu.called { // Cleanup of the local flow has already been performed, diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index fe4fd1cbed3d..208a9042e8c3 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1595,6 +1595,10 @@ type ExecutorTestingKnobs struct { // BeforeCopyFromInsert, if set, will be called during a COPY FROM insert statement. BeforeCopyFromInsert func() error + + // ForceSQLLivenessSession will force the use of a sqlliveness session for + // transaction deadlines even in the system tenant. + ForceSQLLivenessSession bool } // PGWireTestingKnobs contains knobs for the pgwire module. diff --git a/pkg/sql/execinfra/BUILD.bazel b/pkg/sql/execinfra/BUILD.bazel index 6e318108c23e..1771517d12bc 100644 --- a/pkg/sql/execinfra/BUILD.bazel +++ b/pkg/sql/execinfra/BUILD.bazel @@ -51,6 +51,7 @@ go_library( "//pkg/sql/catalog/typedesc", "//pkg/sql/evalcatalog", "//pkg/sql/execinfrapb", + "//pkg/sql/pgwire/pgerror", "//pkg/sql/rowenc", "//pkg/sql/rowenc/valueside", "//pkg/sql/rowinfra", diff --git a/pkg/sql/execinfra/errors.go b/pkg/sql/execinfra/errors.go index 022c59f2a598..8c6e7326f6c6 100644 --- a/pkg/sql/execinfra/errors.go +++ b/pkg/sql/execinfra/errors.go @@ -10,7 +10,10 @@ package execinfra -import "github.com/cockroachdb/errors" +import ( + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/errors" +) // QueryNotRunningInHomeRegionMessagePrefix is the common message prefix for // erroring out queries with no home region when the enforce_home_region session @@ -29,6 +32,8 @@ func NewDynamicQueryHasNoHomeRegionError(err error) error { return &dynamicQueryHasNoHomeRegionError{err: err} } +var _ pgerror.ClientVisibleRetryError = (*dynamicQueryHasNoHomeRegionError)(nil) + // dynamicQueryHasNoHomeRegionError implements the error interface. func (e *dynamicQueryHasNoHomeRegionError) Error() string { return e.err.Error() diff --git a/pkg/sql/flowinfra/BUILD.bazel b/pkg/sql/flowinfra/BUILD.bazel index ef44773a2e7c..e28b4e3721dc 100644 --- a/pkg/sql/flowinfra/BUILD.bazel +++ b/pkg/sql/flowinfra/BUILD.bazel @@ -36,6 +36,7 @@ go_library( "//pkg/sql/types", "//pkg/util/admission", "//pkg/util/admission/admissionpb", + "//pkg/util/buildutil", "//pkg/util/cancelchecker", "//pkg/util/contextutil", "//pkg/util/log", diff --git a/pkg/sql/flowinfra/flow.go b/pkg/sql/flowinfra/flow.go index 6a14ccda81d0..c8944fdad9ab 100644 --- a/pkg/sql/flowinfra/flow.go +++ b/pkg/sql/flowinfra/flow.go @@ -24,10 +24,12 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" "github.com/cockroachdb/cockroach/pkg/util/admission" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/buildutil" "github.com/cockroachdb/cockroach/pkg/util/cancelchecker" "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/optional" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" @@ -151,8 +153,7 @@ type Flow interface { MemUsage() int64 // Cancel cancels the flow by canceling its context. Safe to be called from - // any goroutine but **cannot** be called after (or concurrently with) - // Cleanup. + // any goroutine. Cancel() // AddOnCleanupStart adds a callback to be executed at the very beginning of @@ -232,12 +233,19 @@ type FlowBase struct { statementSQL string - status flowStatus + mu struct { + syncutil.Mutex + status flowStatus + // Cancel function for ctx. Call this to cancel the flow (safe to be + // called multiple times). + // + // NB: must be used with care as this function should **not** be called + // once the Flow has been cleaned up. Consider using Flow.Cancel + // instead when unsure. + ctxCancel context.CancelFunc + } - // Cancel function for ctx. Call this to cancel the flow (safe to be called - // multiple times). - ctxCancel context.CancelFunc - ctxDone <-chan struct{} + ctxDone <-chan struct{} // sp is the span that this Flow runs in. Can be nil if no span was created // for the flow. Flow.Cleanup() finishes it. @@ -249,11 +257,23 @@ type FlowBase struct { admissionInfo admission.WorkInfo } +func (f *FlowBase) getStatus() flowStatus { + f.mu.Lock() + defer f.mu.Unlock() + return f.mu.status +} + +func (f *FlowBase) setStatus(status flowStatus) { + f.mu.Lock() + defer f.mu.Unlock() + f.mu.status = status +} + // Setup is part of the Flow interface. func (f *FlowBase) Setup( ctx context.Context, spec *execinfrapb.FlowSpec, _ FuseOpt, ) (context.Context, execopnode.OpChains, error) { - ctx, f.ctxCancel = contextutil.WithCancel(ctx) + ctx, f.mu.ctxCancel = contextutil.WithCancel(ctx) f.ctxDone = ctx.Done() f.spec = spec return ctx, nil, nil @@ -288,7 +308,7 @@ func (f *FlowBase) SetStartedGoroutines(val bool) { // Started returns true if f has either been Run() or Start()ed. func (f *FlowBase) Started() bool { - return f.status != flowNotStarted + return f.getStatus() != flowNotStarted } var _ Flow = &FlowBase{} @@ -331,7 +351,6 @@ func NewFlowBase( localVectorSources: localVectorSources, admissionInfo: admissionInfo, onCleanupEnd: onFlowCleanupEnd, - status: flowNotStarted, statementSQL: statementSQL, } } @@ -380,9 +399,10 @@ func (f *FlowBase) GetCtxDone() <-chan struct{} { } // GetCancelFlowFn returns the context cancellation function of the context of -// this flow. +// this flow. The returned function is only safe to be used before Flow.Cleanup +// has been called. func (f *FlowBase) GetCancelFlowFn() context.CancelFunc { - return f.ctxCancel + return f.mu.ctxCancel } // SetProcessorsAndOutputs overrides the current f.processors and f.outputs with @@ -466,7 +486,7 @@ func (f *FlowBase) StartInternal( } } - f.status = flowRunning + f.setStatus(flowRunning) if multitenant.TenantRUEstimateEnabled.Get(&f.Cfg.Settings.SV) && !f.Gateway && f.CollectStats { @@ -479,7 +499,10 @@ func (f *FlowBase) StartInternal( log.Infof(ctx, "registered flow %s", f.ID.Short()) } for _, s := range f.startables { - s.Start(ctx, &f.waitGroup, f.ctxCancel) + // Note that it is safe to pass the context cancellation function + // directly since the main goroutine of the Flow will block until all + // startable goroutines exit. + s.Start(ctx, &f.waitGroup, f.mu.ctxCancel) } for i := 0; i < len(processors); i++ { f.waitGroup.Add(1) @@ -561,9 +584,13 @@ func (f *FlowBase) Wait() { var panicVal interface{} if panicVal = recover(); panicVal != nil { // If Wait is called as part of stack unwinding during a panic, the flow - // context must be canceled to ensure that all asynchronous goroutines get - // the message that they must exit (otherwise we will wait indefinitely). - f.ctxCancel() + // context must be canceled to ensure that all asynchronous goroutines + // get the message that they must exit (otherwise we will wait + // indefinitely). + // + // Cleanup is only called _after_ Wait, so it's safe to use ctxCancel + // directly. + f.mu.ctxCancel() } waitChan := make(chan struct{}) @@ -593,7 +620,13 @@ func (f *FlowBase) MemUsage() int64 { // Cancel is part of the Flow interface. func (f *FlowBase) Cancel() { - f.ctxCancel() + f.mu.Lock() + defer f.mu.Unlock() + if f.mu.status == flowFinished { + // The Flow is already done, nothing to cancel. + return + } + f.mu.ctxCancel() } // AddOnCleanupStart is part of the Flow interface. @@ -624,11 +657,13 @@ func (f *FlowBase) GetOnCleanupFns() (startCleanup, endCleanup func()) { } // Cleanup is part of the Flow interface. -// NOTE: this implements only the shared clean up logic between row-based and +// NOTE: this implements only the shared cleanup logic between row-based and // vectorized flows. func (f *FlowBase) Cleanup(ctx context.Context) { - if f.status == flowFinished { - panic("flow cleanup called twice") + if buildutil.CrdbTestBuild { + if f.getStatus() == flowFinished { + panic("flow cleanup called twice") + } } // Release any descriptors accessed by this flow. @@ -675,8 +710,10 @@ func (f *FlowBase) Cleanup(ctx context.Context) { if !f.IsLocal() && f.Started() { f.flowRegistry.UnregisterFlow(f.ID) } - f.status = flowFinished - f.ctxCancel() + // Importantly, we must mark the Flow as finished before f.sp is finished in + // the defer above. + f.setStatus(flowFinished) + f.mu.ctxCancel() } // cancel cancels all unconnected streams of this flow. This function is called diff --git a/pkg/sql/flowinfra/flow_registry.go b/pkg/sql/flowinfra/flow_registry.go index f262133a604a..dcfa782987cc 100644 --- a/pkg/sql/flowinfra/flow_registry.go +++ b/pkg/sql/flowinfra/flow_registry.go @@ -462,7 +462,7 @@ func (fr *FlowRegistry) Drain( // f.flow might be nil when ConnectInboundStream() was // called, but the consumer of that inbound stream hasn't // been scheduled yet. - f.flow.ctxCancel() + f.flow.Cancel() } } fr.Unlock() @@ -583,7 +583,7 @@ func (fr *FlowRegistry) ConnectInboundStream( // query execution will fail, so we cancel the flow on this node. If // this node is the gateway, this might actually be required for // proper shutdown of the whole distributed plan. - flow.ctxCancel() + flow.Cancel() } }() diff --git a/pkg/sql/flowinfra/flow_registry_test.go b/pkg/sql/flowinfra/flow_registry_test.go index 4a2ed8670bbb..0468c7afeb51 100644 --- a/pkg/sql/flowinfra/flow_registry_test.go +++ b/pkg/sql/flowinfra/flow_registry_test.go @@ -212,7 +212,8 @@ func TestStreamConnectionTimeout(t *testing.T) { // Register a flow with a very low timeout. After it times out, we'll attempt // to connect a stream, but it'll be too late. id1 := execinfrapb.FlowID{UUID: uuid.MakeV4()} - f1 := &FlowBase{ctxCancel: func() {}} + f1 := &FlowBase{} + f1.mu.ctxCancel = func() {} streamID1 := execinfrapb.StreamID(1) consumer := &distsqlutils.RowBuffer{} wg := &sync.WaitGroup{} @@ -370,7 +371,8 @@ func TestFlowRegistryDrain(t *testing.T) { ctx := context.Background() reg := NewFlowRegistry() - flow := &FlowBase{ctxCancel: func() {}} + flow := &FlowBase{} + flow.mu.ctxCancel = func() {} id := execinfrapb.FlowID{UUID: uuid.MakeV4()} registerFlow := func(t *testing.T, id execinfrapb.FlowID) { t.Helper() @@ -706,9 +708,10 @@ func TestErrorOnSlowHandshake(t *testing.T) { flowID := execinfrapb.FlowID{UUID: uuid.MakeV4()} streamID := execinfrapb.StreamID(1) cancelCh := make(chan struct{}) - f := &FlowBase{ctxCancel: func() { + f := &FlowBase{} + f.mu.ctxCancel = func() { cancelCh <- struct{}{} - }} + } serverStream, _ /* clientStream */, cleanup := createDummyStream(t) defer cleanup() diff --git a/pkg/sql/schemachanger/scerrors/BUILD.bazel b/pkg/sql/schemachanger/scerrors/BUILD.bazel index 5b62de479367..6eaaea5e5657 100644 --- a/pkg/sql/schemachanger/scerrors/BUILD.bazel +++ b/pkg/sql/schemachanger/scerrors/BUILD.bazel @@ -9,6 +9,7 @@ go_library( deps = [ "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", + "//pkg/sql/pgwire/pgerror", "//pkg/sql/sem/tree", "//pkg/util/log", "//pkg/util/timeutil", diff --git a/pkg/sql/schemachanger/scerrors/errors.go b/pkg/sql/schemachanger/scerrors/errors.go index 3c70ae965646..f40bac8b6519 100644 --- a/pkg/sql/schemachanger/scerrors/errors.go +++ b/pkg/sql/schemachanger/scerrors/errors.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" @@ -130,6 +131,8 @@ type concurrentSchemaChangeError struct { descID descpb.ID } +var _ pgerror.ClientVisibleRetryError = (*concurrentSchemaChangeError)(nil) + // ClientVisibleRetryError is detected by the pgwire layer and will convert // this error into a serialization error to be retried. See // pgcode.ClientVisibleRetryError. diff --git a/pkg/sql/sqlliveness/sqllivenesstestutils/BUILD.bazel b/pkg/sql/sqlliveness/sqllivenesstestutils/BUILD.bazel index d60d183bca40..e30a5fb165e0 100644 --- a/pkg/sql/sqlliveness/sqllivenesstestutils/BUILD.bazel +++ b/pkg/sql/sqlliveness/sqllivenesstestutils/BUILD.bazel @@ -3,7 +3,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "sqllivenesstestutils", - srcs = ["alwaysalivesession.go"], + srcs = [ + "alwaysalivesession.go", + "fake_session.go", + ], importpath = "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/sqllivenesstestutils", visibility = ["//visibility:public"], deps = [ diff --git a/pkg/sql/sqlliveness/sqllivenesstestutils/fake_session.go b/pkg/sql/sqlliveness/sqllivenesstestutils/fake_session.go new file mode 100644 index 000000000000..e084f07e2058 --- /dev/null +++ b/pkg/sql/sqlliveness/sqllivenesstestutils/fake_session.go @@ -0,0 +1,34 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sqllivenesstestutils + +import ( + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/util/hlc" +) + +// FakeSession is an implementation of sqlliveness.Session for testing. +type FakeSession struct { + SessionID sqlliveness.SessionID + ExpTS hlc.Timestamp + StartTS hlc.Timestamp +} + +var _ sqlliveness.Session = (*FakeSession)(nil) + +// ID returns f.SessionID. +func (f *FakeSession) ID() sqlliveness.SessionID { return f.SessionID } + +// Expiration returns f.ExpTS. +func (f *FakeSession) Expiration() hlc.Timestamp { return f.ExpTS } + +// Start return f.StartTS. +func (f *FakeSession) Start() hlc.Timestamp { return f.StartTS }