diff --git a/pkg/kv/txn.go b/pkg/kv/txn.go index 7f0914a5bc33..f0c108f2220a 100644 --- a/pkg/kv/txn.go +++ b/pkg/kv/txn.go @@ -993,7 +993,8 @@ func (txn *Txn) PrepareForRetry(ctx context.Context) { } log.VEventf(ctx, 2, "retrying transaction: %s because of a retryable error: %s", txn.debugNameLocked(), retryErr) - txn.handleRetryableErrLocked(ctx, retryErr) + txn.resetDeadlineLocked() + txn.replaceRootSenderIfTxnAbortedLocked(ctx, retryErr, retryErr.TxnID) } // IsRetryableErrMeantForTxn returns true if err is a retryable @@ -1073,13 +1074,6 @@ func (txn *Txn) Send( return br, pErr } -func (txn *Txn) handleRetryableErrLocked( - ctx context.Context, retryErr *kvpb.TransactionRetryWithProtoRefreshError, -) { - txn.resetDeadlineLocked() - txn.replaceRootSenderIfTxnAbortedLocked(ctx, retryErr, retryErr.TxnID) -} - // NegotiateAndSend is a specialized version of Send that is capable of // orchestrating a bounded-staleness read through the transaction, given a // read-only BatchRequest with a min_timestamp_bound set in its Header. @@ -1251,10 +1245,11 @@ func (txn *Txn) GetLeafTxnInputState(ctx context.Context) *roachpb.LeafTxnInputS // GetLeafTxnInputStateOrRejectClient is like GetLeafTxnInputState // except, if the transaction is already aborted or otherwise in state -// that cannot make progress, it returns an error. If the transaction -// is aborted, the error will be a retryable one, and the transaction -// will have been prepared for another transaction attempt (so, on -// retryable errors, it acts like Send()). +// that cannot make progress, it returns an error. If the transaction aborted +// the error returned will be a retryable one; as such, the caller is +// responsible for handling the error before another attempt by calling +// PrepareForRetry. Use of the transaction before doing so will continue to be +// rejected. func (txn *Txn) GetLeafTxnInputStateOrRejectClient( ctx context.Context, ) (*roachpb.LeafTxnInputState, error) { @@ -1267,10 +1262,6 @@ func (txn *Txn) GetLeafTxnInputStateOrRejectClient( defer txn.mu.Unlock() tfs, err := txn.mu.sender.GetLeafTxnInputState(ctx, OnlyPending) if err != nil { - var retryErr *kvpb.TransactionRetryWithProtoRefreshError - if errors.As(err, &retryErr) { - txn.handleRetryableErrLocked(ctx, retryErr) - } return nil, err } return tfs, nil @@ -1339,8 +1330,6 @@ func (txn *Txn) UpdateStateOnRemoteRetryableErr(ctx context.Context, pErr *kvpb. } pErr = txn.mu.sender.UpdateStateOnRemoteRetryableErr(ctx, pErr) - txn.replaceRootSenderIfTxnAbortedLocked(ctx, pErr.GetDetail().(*kvpb.TransactionRetryWithProtoRefreshError), origTxnID) - return pErr.GoError() } diff --git a/pkg/kv/txn_external_test.go b/pkg/kv/txn_external_test.go index fbfeccf4ccf0..7a4fe94598b0 100644 --- a/pkg/kv/txn_external_test.go +++ b/pkg/kv/txn_external_test.go @@ -703,3 +703,70 @@ func TestGenerateForcedRetryableErrorByPoisoning(t *testing.T) { checkKey(t, "a", 1) checkKey(t, "b", 2) } + +// TestUpdateStateOnRemoteRetryErr ensures transaction state is updated and a +// TransactionRetryWithProtoRefreshError is correctly constructed by +// UpdateStateOnRemoteRetryableError. +func TestUpdateStateOnRemoteRetryableErr(t *testing.T) { + ctx := context.Background() + _, _, db := serverutils.StartServer(t, base.TestServerArgs{}) + + testCases := []struct { + err *kvpb.Error + epochBumped bool // if we expect the epoch to be bumped + newTxn bool // if we expect a new transaction in the returned error; implies to an ABORT + }{ + { + err: kvpb.NewError(&kvpb.ReadWithinUncertaintyIntervalError{}), + epochBumped: true, + newTxn: false, + }, + { + err: kvpb.NewError(&kvpb.TransactionAbortedError{}), + epochBumped: false, + newTxn: true, + }, + { + err: kvpb.NewError(&kvpb.TransactionPushError{}), + epochBumped: true, + newTxn: false, + }, + { + err: kvpb.NewError(&kvpb.TransactionRetryError{}), + epochBumped: true, + newTxn: false, + }, + { + err: kvpb.NewError(&kvpb.WriteTooOldError{}), + epochBumped: true, + newTxn: false, + }, + // TODO(arul): IntentMissingError currently causes a fatal in + // PrepareTransactionForRetry. File an issue and fix this case. + //{ + // err: kvpb.NewError(&kvpb.IntentMissingError{}), + // epochBumped: true, + // newTxn: false, + //}, + } + + for _, tc := range testCases { + txn := db.NewTxn(ctx, "test") + pErr := tc.err + pErr.SetTxn(txn.Sender().TestingCloneTxn()) + epochBefore := txn.Epoch() + txnIDBefore := txn.ID() + err := txn.UpdateStateOnRemoteRetryableErr(ctx, pErr) + require.IsType(t, &kvpb.TransactionRetryWithProtoRefreshError{}, err) + retErr := txn.Sender().GetTxnRetryableErr(ctx) + require.Equal(t, retErr, err) + if tc.epochBumped { + require.Greater(t, txn.Epoch(), epochBefore) + require.Equal(t, retErr.TxnID, txnIDBefore) + } + if tc.newTxn { + require.NotEqual(t, retErr.Transaction.ID, txnIDBefore) + require.Equal(t, txn.Sender().TxnStatus(), roachpb.ABORTED) + } + } +} diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index eb07b2b32e45..9427ce2a6c24 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -640,6 +640,7 @@ func (dsp *DistSQLPlanner) setupFlows( } const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan" +const executingParallelAndSerialChecks = "executing %d checks concurrently and %d checks serially" // Run executes a physical plan. The plan should have been finalized using // FinalizePlan. @@ -1582,6 +1583,10 @@ func (dsp *DistSQLPlanner) PlanAndRunAll( return recv.commErr } + if fn := evalCtx.ExecCfg.DistSQLRunTestingKnobs.RunBeforeCascadesAndChecks; fn != nil { + fn(planner.Txn().ID()) + } + dsp.PlanAndRunCascadesAndChecks( ctx, planner, evalCtxFactory, &planner.curPlan.planComponents, recv, ) @@ -2212,7 +2217,9 @@ func (dsp *DistSQLPlanner) planAndRunChecksInParallel( numParallelChecks-- } - log.VEventf(ctx, 2, "executing %d checks concurrently and %d checks serially", numParallelChecks, len(checkPlans)-numParallelChecks) + log.VEventf( + ctx, 2, executingParallelAndSerialChecks, numParallelChecks, len(checkPlans)-numParallelChecks, + ) // Set up a wait group so that the main (current) goroutine can block until // all concurrent checks return. We cannot short-circuit if one of the diff --git a/pkg/sql/distsql_running_test.go b/pkg/sql/distsql_running_test.go index de4b2400b039..4c43e39a61d8 100644 --- a/pkg/sql/distsql_running_test.go +++ b/pkg/sql/distsql_running_test.go @@ -41,6 +41,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/datadriven" @@ -197,6 +198,203 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) { } } +// TestDistSQLRunningParallelFKChecksAfterAbort simulates a SQL transaction +// that writes two rows required to validate a FK check and then proceeds to +// write a third row that would actually trigger this check. The transaction is +// aborted after the third row is written but before the FK check is performed. +// We assert that this construction doesn't throw a FK violation; instead, the +// transaction should be able to retry. +// This test serves as a regression test for the hazard identified in +// https://github.com/cockroachdb/cockroach/issues/97141. +func TestDistSQLRunningParallelFKChecksAfterAbort(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + mu := struct { + syncutil.Mutex + abortTxn func(uuid uuid.UUID) + }{} + + s, conn, db := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + DistSQL: &execinfra.TestingKnobs{ + RunBeforeCascadesAndChecks: func(txnID uuid.UUID) { + mu.Lock() + defer mu.Unlock() + if mu.abortTxn != nil { + mu.abortTxn(txnID) + } + }, + }, + }, + }) + defer s.Stopper().Stop(ctx) + sqlDB := sqlutils.MakeSQLRunner(conn) + + // Set up schemas for the test. We want a construction that results in 2 FK + // checks, of which 1 is done in parallel. + sqlDB.Exec(t, "create database test") + sqlDB.Exec(t, "create table test.parent1(a INT PRIMARY KEY)") + sqlDB.Exec(t, "create table test.parent2(b INT PRIMARY KEY)") + sqlDB.Exec( + t, + "create table test.child(a INT, b INT, FOREIGN KEY (a) REFERENCES test.parent1(a), FOREIGN KEY (b) REFERENCES test.parent2(b))", + ) + key := roachpb.Key("a") + + setupQueries := []string{ + "insert into test.parent1 VALUES(1)", + "insert into test.parent2 VALUES(2)", + } + query := "insert into test.child VALUES(1, 2)" + + createPlannerAndRunQuery := func(ctx context.Context, txn *kv.Txn, query string) error { + execCfg := s.ExecutorConfig().(ExecutorConfig) + // Plan the statement. + internalPlanner, cleanup := NewInternalPlanner( + "test", + txn, + username.RootUserName(), + &MemoryMetrics{}, + &execCfg, + sessiondatapb.SessionData{}, + ) + defer cleanup() + p := internalPlanner.(*planner) + stmt, err := parser.ParseOne(query) + require.NoError(t, err) + + rw := NewCallbackResultWriter(func(ctx context.Context, row tree.Datums) error { + return nil + }) + recv := MakeDistSQLReceiver( + ctx, + rw, + stmt.AST.StatementReturnType(), + execCfg.RangeDescriptorCache, + txn, + execCfg.Clock, + p.ExtendedEvalContext().Tracing, + ) + + p.stmt = makeStatement(stmt, clusterunique.ID{}) + if err := p.makeOptimizerPlan(ctx); err != nil { + t.Fatal(err) + } + defer p.curPlan.close(ctx) + + evalCtx := p.ExtendedEvalContext() + // We need distribute = true so that executing the plan involves marshaling + // the root txn meta to leaf txns. + planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, txn, DistributionTypeNone) + planCtx.stmtType = recv.stmtType + + evalCtxFactory := func(bool) *extendedEvalContext { + factoryEvalCtx := extendedEvalContext{Tracing: evalCtx.Tracing} + //factoryEvalCtx.Context = evalCtx.Context + return &factoryEvalCtx + } + err = execCfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, p, recv, evalCtxFactory) + if err != nil { + return err + } + return rw.Err() + } + + push := func(ctx context.Context, key roachpb.Key) error { + // Conflicting transaction that pushes another transaction. + conflictTxn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */) + // We need to explicitly set a high priority for the push to happen. + if err := conflictTxn.SetUserPriority(roachpb.MaxUserPriority); err != nil { + return err + } + // Push through a Put, as opposed to a Get, so that the pushee gets aborted. + if err := conflictTxn.Put(ctx, key, "pusher was here"); err != nil { + return err + } + err := conflictTxn.Commit(ctx) + require.NoError(t, err) + t.Log(conflictTxn.Rollback(ctx)) + return err + } + + // Make a db with a short heartbeat interval, so that the aborted txn finds + // out quickly. + ambient := s.AmbientCtx() + tsf := kvcoord.NewTxnCoordSenderFactory( + kvcoord.TxnCoordSenderFactoryConfig{ + AmbientCtx: ambient, + // Short heartbeat interval. + HeartbeatInterval: time.Millisecond, + Settings: s.ClusterSettings(), + Clock: s.Clock(), + Stopper: s.Stopper(), + }, + s.DistSenderI().(*kvcoord.DistSender), + ) + shortDB := kv.NewDB(ambient, tsf, s.Clock(), s.Stopper()) + + iter := 0 + // We'll trace to make sure the test isn't fooling itself. + tr := s.TracerI().(*tracing.Tracer) + runningCtx, getRecAndFinish := tracing.ContextWithRecordingSpan(ctx, tr, "test") + defer getRecAndFinish() + err := shortDB.Txn(runningCtx, func(ctx context.Context, txn *kv.Txn) error { + iter++ + + // set up the test. + for _, query := range setupQueries { + err := createPlannerAndRunQuery(ctx, txn, query) + require.NoError(t, err) + } + + if iter == 1 { + // On the first iteration, abort the txn by setting the abortTxn function. + mu.Lock() + mu.abortTxn = func(txnID uuid.UUID) { + if txnID != txn.ID() { + return // not our txn + } + if err := txn.Put(ctx, key, "val"); err != nil { + t.Fatal(err) + } + if err := push(ctx, key); err != nil { + t.Fatal(err) + } + // Now wait until the heartbeat loop notices that the transaction is aborted. + testutils.SucceedsSoon(t, func() error { + if txn.Sender().(*kvcoord.TxnCoordSender).IsTracking() { + return fmt.Errorf("txn heartbeat loop running") + } + return nil + }) + } + mu.Unlock() + defer func() { + // clear the abortTxn function before returning. + mu.Lock() + mu.abortTxn = nil + mu.Unlock() + }() + } + + // Execute the FK checks. + return createPlannerAndRunQuery(ctx, txn, query) + }) + if err != nil { + t.Fatal(err) + } + require.Equal(t, iter, 2) + if tracing.FindMsgInRecording(getRecAndFinish(), clientRejectedMsg) == -1 { + t.Fatalf("didn't find expected message in trace: %s", clientRejectedMsg) + } + concurrentFKChecksLogMessage := fmt.Sprintf(executingParallelAndSerialChecks, 1, 1) + if tracing.FindMsgInRecording(getRecAndFinish(), concurrentFKChecksLogMessage) == -1 { + t.Fatalf("didn't find expected message in trace: %s", concurrentFKChecksLogMessage) + } +} + // Test that the DistSQLReceiver overwrites previous errors as "better" errors // come along. func TestDistSQLReceiverErrorRanking(t *testing.T) { diff --git a/pkg/sql/execinfra/server_config.go b/pkg/sql/execinfra/server_config.go index 82267d185c6d..ae1975c9cd9e 100644 --- a/pkg/sql/execinfra/server_config.go +++ b/pkg/sql/execinfra/server_config.go @@ -42,6 +42,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/marusama/semaphore" ) @@ -298,6 +299,11 @@ type TestingKnobs struct { // when responding to SetupFlow RPCs, after the flow is set up but before it // is started. SetupFlowCb func(context.Context, base.SQLInstanceID, *execinfrapb.SetupFlowRequest) error + + // RunBeforeCascadeAndChecks is run before any cascade or check queries are + // run. The associated transaction ID of the statement performing the cascade + // or check query is passed in as an argument. + RunBeforeCascadesAndChecks func(txnID uuid.UUID) } // ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.