Skip to content

Commit

Permalink
sql,kv: bubble up retry errors when creating leaf transactions
Browse files Browse the repository at this point in the history
Previously, if we detected that the transaction was aborted when
trying to construct leaf transaction state, we would handle the retry
error instead of bubbling it up to the caller. When a transaction is
aborted, the `TransactionRetryWithProtoRefreshError` carries with it a
new transaction that should be used for subsequent attempts. Handling
the retry error entailed swapping out the old `TxnCoordSender` with a
new one -- one that is associated with this new transaction.

This is bug prone when trying to create multiple leaf transactions in
parallel if the root has been aborted. We would expect the first leaf
transaction to handle the error and all subsequent leaf transactions to
point to the new transaction, as the `TxnCoordSender` has been swapped
out. This wasn't an issue before as we never really created multiple
leaf transactions in parallel. This recently change in
0f4b431, which started parallelizing FK
and uniqueness checks. With this change, we could see FK or uniqueness
violations when in fact the transaction needed to be retried.

This patch fixes the issue described above by not handling the retry
error when creating leaf transactions. Instead, we expect the
ConnExecutor to retry the entire transaction and prepare it for another
iteration.

Fixes cockroachdb#97141

Epic: none

Release note: None
  • Loading branch information
arulajmani committed Mar 15, 2023
1 parent 62faa26 commit 582b601
Show file tree
Hide file tree
Showing 4 changed files with 205 additions and 10 deletions.
12 changes: 3 additions & 9 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1251,10 +1251,9 @@ func (txn *Txn) GetLeafTxnInputState(ctx context.Context) *roachpb.LeafTxnInputS

// GetLeafTxnInputStateOrRejectClient is like GetLeafTxnInputState
// except, if the transaction is already aborted or otherwise in state
// that cannot make progress, it returns an error. If the transaction
// is aborted, the error will be a retryable one, and the transaction
// will have been prepared for another transaction attempt (so, on
// retryable errors, it acts like Send()).
// that cannot make progress, it returns an error. If the transaction aborted
// the error returned will be a retryable one; as such, the caller is
// responsible for handling the error before another attempt.
func (txn *Txn) GetLeafTxnInputStateOrRejectClient(
ctx context.Context,
) (*roachpb.LeafTxnInputState, error) {
Expand All @@ -1267,10 +1266,6 @@ func (txn *Txn) GetLeafTxnInputStateOrRejectClient(
defer txn.mu.Unlock()
tfs, err := txn.mu.sender.GetLeafTxnInputState(ctx, OnlyPending)
if err != nil {
var retryErr *kvpb.TransactionRetryWithProtoRefreshError
if errors.As(err, &retryErr) {
txn.handleRetryableErrLocked(ctx, retryErr)
}
return nil, err
}
return tfs, nil
Expand Down Expand Up @@ -1339,7 +1334,6 @@ func (txn *Txn) UpdateStateOnRemoteRetryableErr(ctx context.Context, pErr *kvpb.
}

pErr = txn.mu.sender.UpdateStateOnRemoteRetryableErr(ctx, pErr)
txn.replaceRootSenderIfTxnAbortedLocked(ctx, pErr.GetDetail().(*kvpb.TransactionRetryWithProtoRefreshError), origTxnID)

return pErr.GoError()
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ func (dsp *DistSQLPlanner) setupFlows(
}

const clientRejectedMsg string = "client rejected when attempting to run DistSQL plan"
const executingParallelAndSerialChecks = "executing %d checks concurrently and %d checks serially"

// Run executes a physical plan. The plan should have been finalized using
// FinalizePlan.
Expand Down Expand Up @@ -1582,6 +1583,10 @@ func (dsp *DistSQLPlanner) PlanAndRunAll(
return recv.commErr
}

if fn := evalCtx.ExecCfg.DistSQLRunTestingKnobs.RunBeforeCascadesAndChecks; fn != nil {
fn(planner.Txn().ID())
}

dsp.PlanAndRunCascadesAndChecks(
ctx, planner, evalCtxFactory, &planner.curPlan.planComponents, recv,
)
Expand Down Expand Up @@ -2212,7 +2217,9 @@ func (dsp *DistSQLPlanner) planAndRunChecksInParallel(
numParallelChecks--
}

log.VEventf(ctx, 2, "executing %d checks concurrently and %d checks serially", numParallelChecks, len(checkPlans)-numParallelChecks)
log.VEventf(
ctx, 2, executingParallelAndSerialChecks, numParallelChecks, len(checkPlans)-numParallelChecks,
)

// Set up a wait group so that the main (current) goroutine can block until
// all concurrent checks return. We cannot short-circuit if one of the
Expand Down
188 changes: 188 additions & 0 deletions pkg/sql/distsql_running_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,194 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) {
}
}

// TestDistSQLRunningParallelFKChecksAfterAbort simulates a SQL transaction
// that writes two rows required to validate a FK check and then proceeds to
// write a third row that would actually trigger this check. The transaction is
// aborted after the third row is written but before the FK check is performed.
// We assert that this construction doesn't throw a FK violation; instead, the
// transaction should be able to retry.
// This test serves as a regression test for the hazard identified in
// https://github.com/cockroachdb/cockroach/issues/97141.
func TestDistSQLRunningParallelFKChecksAfterAbort(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
var abortTxn func(uuid uuid.UUID)

s, conn, db := serverutils.StartServer(t, base.TestServerArgs{
Knobs: base.TestingKnobs{
DistSQL: &execinfra.TestingKnobs{
RunBeforeCascadesAndChecks: func(txnID uuid.UUID) {
if abortTxn != nil {
abortTxn(txnID)
}
},
},
},
})
defer s.Stopper().Stop(ctx)
sqlDB := sqlutils.MakeSQLRunner(conn)

// Set up schemas for the test. We want a construction that results in 2 FK
// checks, of which 1 is done in parallel.
sqlDB.Exec(t, "create database test")
sqlDB.Exec(t, "create table test.parent1(a INT PRIMARY KEY)")
sqlDB.Exec(t, "create table test.parent2(b INT PRIMARY KEY)")
sqlDB.Exec(
t,
"create table test.child(a INT, b INT, FOREIGN KEY (a) REFERENCES test.parent1(a), FOREIGN KEY (b) REFERENCES test.parent2(b))",
)
key := roachpb.Key("a")

setupQueries := []string{
"insert into test.parent1 VALUES(1)",
"insert into test.parent2 VALUES(2)",
}
query := "insert into test.child VALUES(1, 2)"

createPlannerAndRunQuery := func(ctx context.Context, txn *kv.Txn, query string) error {
execCfg := s.ExecutorConfig().(ExecutorConfig)
// Plan the statemeent.
internalPlanner, cleanup := NewInternalPlanner(
"test",
txn,
username.RootUserName(),
&MemoryMetrics{},
&execCfg,
sessiondatapb.SessionData{},
)
defer cleanup()
p := internalPlanner.(*planner)
stmt, err := parser.ParseOne(query)
require.NoError(t, err)

rw := NewCallbackResultWriter(func(ctx context.Context, row tree.Datums) error {
return nil
})
recv := MakeDistSQLReceiver(
ctx,
rw,
stmt.AST.StatementReturnType(),
execCfg.RangeDescriptorCache,
txn,
execCfg.Clock,
p.ExtendedEvalContext().Tracing,
)

p.stmt = makeStatement(stmt, clusterunique.ID{})
if err := p.makeOptimizerPlan(ctx); err != nil {
t.Fatal(err)
}
defer p.curPlan.close(ctx)

evalCtx := p.ExtendedEvalContext()
// We need distribute = true so that executing the plan involves marshaling
// the root txn meta to leaf txns.
planCtx := execCfg.DistSQLPlanner.NewPlanningCtx(ctx, evalCtx, p, nil,
DistributionTypeSystemTenantOnly)
planCtx.stmtType = recv.stmtType

evalCtxFactory := func(bool) *extendedEvalContext {
return evalCtx
}
err = execCfg.DistSQLPlanner.PlanAndRunAll(ctx, evalCtx, planCtx, p, recv, evalCtxFactory)
if err != nil {
return err
}
return rw.Err()
}

push := func(ctx context.Context, key roachpb.Key) error {
// Conflicting transaction that pushes another transaction.
conflictTxn := kv.NewTxn(ctx, db, 0 /* gatewayNodeID */)
// We need to explicitly set a high priority for the push to happen.
if err := conflictTxn.SetUserPriority(roachpb.MaxUserPriority); err != nil {
return err
}
// Push through a Put, as opposed to a Get, so that the pushee gets aborted.
if err := conflictTxn.Put(ctx, key, "pusher was here"); err != nil {
return err
}
err := conflictTxn.Commit(ctx)
require.NoError(t, err)
t.Log(conflictTxn.Rollback(ctx))
return err
}

// Make a db with a short heartbeat interval, so that the aborted txn finds
// out quickly.
ambient := s.AmbientCtx()
tsf := kvcoord.NewTxnCoordSenderFactory(
kvcoord.TxnCoordSenderFactoryConfig{
AmbientCtx: ambient,
// Short heartbeat interval.
HeartbeatInterval: time.Millisecond,
Settings: s.ClusterSettings(),
Clock: s.Clock(),
Stopper: s.Stopper(),
},
s.DistSenderI().(*kvcoord.DistSender),
)
shortDB := kv.NewDB(ambient, tsf, s.Clock(), s.Stopper())

iter := 0
// We'll trace to make sure the test isn't fooling itself.
tr := s.TracerI().(*tracing.Tracer)
runningCtx, getRecAndFinish := tracing.ContextWithRecordingSpan(ctx, tr, "test")
defer getRecAndFinish()
err := shortDB.Txn(runningCtx, func(ctx context.Context, txn *kv.Txn) error {
iter++

// set up the test.
for _, query := range setupQueries {
err := createPlannerAndRunQuery(ctx, txn, query)
require.NoError(t, err)
}

if iter == 1 {
// On the first iteration, abort the txn by setting the abortTxn function.
abortTxn = func(txnID uuid.UUID) {
if txnID != txn.ID() {
return // not our txn
}
if err := txn.Put(ctx, key, "val"); err != nil {
t.Fatal(err)
}
if err := push(ctx, key); err != nil {
t.Fatal(err)
}
// Now wait until the heartbeat loop notices that the transaction is aborted.
testutils.SucceedsSoon(t, func() error {
if txn.Sender().(*kvcoord.TxnCoordSender).IsTracking() {
return fmt.Errorf("txn heartbeat loop running")
}
return nil
})
}
}

// Execute the FK checks.
err := createPlannerAndRunQuery(ctx, txn, query)
if iter == 1 {
// clear the abortTxn function before returning.
abortTxn = nil
}
return err
})
if err != nil {
t.Fatal(err)
}
require.Equal(t, iter, 2)
if tracing.FindMsgInRecording(getRecAndFinish(), clientRejectedMsg) == -1 {
t.Fatalf("didn't find expected message in trace: %s", clientRejectedMsg)
}
concurrentFKChecksLogMessage := fmt.Sprintf(executingParallelAndSerialChecks, 1, 1)
if tracing.FindMsgInRecording(getRecAndFinish(), concurrentFKChecksLogMessage) == -1 {
t.Fatalf("didn't find expected message in trace: %s", concurrentFKChecksLogMessage)
}
}

// Test that the DistSQLReceiver overwrites previous errors as "better" errors
// come along.
func TestDistSQLReceiverErrorRanking(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/execinfra/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/marusama/semaphore"
)
Expand Down Expand Up @@ -298,6 +299,11 @@ type TestingKnobs struct {
// when responding to SetupFlow RPCs, after the flow is set up but before it
// is started.
SetupFlowCb func(context.Context, base.SQLInstanceID, *execinfrapb.SetupFlowRequest) error

// RunBeforeCascadeAndChecks is run before any cascade or check queries are
// run. The associated transaction ID of the statement performing the cascade
// or check query is passed in as an argument.
RunBeforeCascadesAndChecks func(txnID uuid.UUID)
}

// ModuleTestingKnobs is part of the base.ModuleTestingKnobs interface.
Expand Down

0 comments on commit 582b601

Please sign in to comment.