From b236e3774e260aa3df1a620dd95f23dcc56d496b Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Thu, 11 May 2023 12:58:56 -0400 Subject: [PATCH] kvserver: add test for transaction unexpectedly committed This adds a unit test to reproduce the behavior described in #103817 and seen in #67765, which currently is a bug in our implementation of the parallel commit protocol that results in the assertion failure known as `transaction unexpectedly committed`. The test currently validates the incorrect behavior of the known issue, though it is inded to be used to validate the potential fixes as proposed in #103817. Release note: None Part of: #103817 --- pkg/kv/kvpb/batch.go | 13 + pkg/kv/kvserver/BUILD.bazel | 4 + .../kvserver/batcheval/cmd_lease_request.go | 2 + .../kvserver/client_unexpected_commit_test.go | 499 ++++++++++++++++++ 4 files changed, 518 insertions(+) create mode 100644 pkg/kv/kvserver/client_unexpected_commit_test.go diff --git a/pkg/kv/kvpb/batch.go b/pkg/kv/kvpb/batch.go index d2a01cc266c1..ddb84b3b5ee4 100644 --- a/pkg/kv/kvpb/batch.go +++ b/pkg/kv/kvpb/batch.go @@ -258,6 +258,12 @@ func (ba *BatchRequest) IsSinglePushTxnRequest() bool { return ba.isSingleRequestWithMethod(PushTxn) } +// IsSingleRecoverTxnRequest returns true iff the batch contains a single request, +// and that request is a RecoverTxnRequest. +func (ba *BatchRequest) IsSingleRecoverTxnRequest() bool { + return ba.isSingleRequestWithMethod(RecoverTxn) +} + // IsSingleHeartbeatTxnRequest returns true iff the batch contains a single // request, and that request is a HeartbeatTxn. func (ba *BatchRequest) IsSingleHeartbeatTxnRequest() bool { @@ -827,6 +833,13 @@ func (ba BatchRequest) SafeFormat(s redact.SafePrinter, _ rune) { s.Printf(" %s", et.InternalCommitTrigger.Kind()) } s.Printf(") [%s]", h.Key) + } else if rt, ok := req.(*RecoverTxnRequest); ok { + h := req.Header() + s.Printf("%s(%s", req.Method(), rt.Txn.Short()) + if rt.ImplicitlyCommitted { + s.Printf(", implicitly committed") + } + s.Printf(") [%s]", h.Key) } else { h := req.Header() if req.Method() == PushTxn { diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 4f2f2ec0c460..47b6bfaa9865 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -274,6 +274,7 @@ go_test( "client_store_test.go", "client_tenant_test.go", "client_test.go", + "client_unexpected_commit_test.go", "closed_timestamp_test.go", "consistency_queue_test.go", "debug_print_test.go", @@ -520,7 +521,10 @@ go_test( "@io_etcd_go_raft_v3//raftpb", "@io_etcd_go_raft_v3//tracker", "@org_golang_google_grpc//:go_default_library", + "@org_golang_google_grpc//codes", "@org_golang_google_grpc//metadata", + "@org_golang_google_grpc//status", + "@org_golang_x_net//trace", "@org_golang_x_sync//errgroup", "@org_golang_x_sync//syncmap", "@org_golang_x_time//rate", diff --git a/pkg/kv/kvserver/batcheval/cmd_lease_request.go b/pkg/kv/kvserver/batcheval/cmd_lease_request.go index 590e33c39b4e..3e0abd0fc158 100644 --- a/pkg/kv/kvserver/batcheval/cmd_lease_request.go +++ b/pkg/kv/kvserver/batcheval/cmd_lease_request.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/log" ) func init() { @@ -159,6 +160,7 @@ func RequestLease( priorReadSum = &worstCaseSum } + log.VEventf(ctx, 2, "lease request: prev lease: %+v, new lease: %+v", prevLease, newLease) return evalNewLease(ctx, cArgs.EvalCtx, readWriter, cArgs.Stats, newLease, prevLease, priorReadSum, isExtension, false /* isTransfer */) } diff --git a/pkg/kv/kvserver/client_unexpected_commit_test.go b/pkg/kv/kvserver/client_unexpected_commit_test.go new file mode 100644 index 000000000000..80f1ee227757 --- /dev/null +++ b/pkg/kv/kvserver/client_unexpected_commit_test.go @@ -0,0 +1,499 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver_test + +import ( + "context" + "fmt" + "strings" + "sync" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/enginepb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/ts" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + grpcstatus "google.golang.org/grpc/status" +) + +type interceptingTransport struct { + kvcoord.Transport + intercept func(context.Context, *kvpb.BatchRequest) (*kvpb.BatchResponse, error) +} + +func (t *interceptingTransport) SendNext( + ctx context.Context, ba *kvpb.BatchRequest, +) (*kvpb.BatchResponse, error) { + return t.intercept(ctx, ba) +} + +func getJamKey( + txnName string, fromNodeID, toNodeID roachpb.NodeID, rangeID roachpb.RangeID, +) string { + return fmt.Sprintf("%s:n%d->n%d:r%d", txnName, fromNodeID, toNodeID, rangeID) +} + +// TestTransactionUnexpectedlyCommitted validates the handling of the case where +// a parallel commit transaction with an ambiguous error on a write races with +// a contending transaction's recovery attempt. In the case that the recovery +// succeeds prior to the original transaction's retries, an ambiguous error +// should be raised. +// +// NB: This case encounters a known issue described in #103817 and seen in #67765, +// where it currently is surfaced as an assertion failure that will result in a +// node crash. +// +// TODO(sarkesian): Validate the ambiguous result error once the initial fix as +// +// outlined in #103817 has been resolved. +func TestTransactionUnexpectedlyCommitted(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // This test is slow and depends on an intricate sequencing of events, so + // should not be run under race/deadlock. + skip.UnderShort(t) + skip.UnderRace(t) + skip.UnderDeadlock(t) + + // Checkpoints in test. + var jammedOps sync.Map // map[string]chan struct{} + txn1Ready := make(chan struct{}) + txn1MetaCh := make(chan *enginepb.TxnMeta, 1) + txn2Ready := make(chan struct{}) + leaseMoveReady := make(chan struct{}) + recoverReady := make(chan struct{}) + recoverComplete := make(chan struct{}) + networkManipReady := make(chan struct{}) + networkManipComplete := make(chan struct{}) + + // Final result. + txn1ResultCh := make(chan error, 1) + + // Test helpers. + dbg := func(format string, args ...any) { + t.Helper() + t.Logf("[dbg] "+format, args...) + } + + dumpKVs := func(scanResults []kv.KeyValue) { + t.Helper() + for _, kv := range scanResults { + mvccValue, err := storage.DecodeMVCCValue(kv.Value.RawBytes) + require.NoError(t, err) + t.Logf("key: %s, value: %s", kv.Key, mvccValue) + } + } + + // Network jamming helper functions. + jamOperation := func(jamKey string) { + dbg("Jamming %s", jamKey) + jamChan := make(chan struct{}) + jammedOps.Store(jamKey, jamChan) + } + + unjamOperation := func(jamKey string) { + var jamChan chan struct{} + if jamVal, ok := jammedOps.Load(jamKey); ok { + jamChan = jamVal.(chan struct{}) + } else { + t.Fatalf("couldn't find operation to unjam: %s", jamKey) + } + + close(jamChan) + } + + // Handle all synchronization of KV operations at the transport level. + // This allows us to schedule the operations such that they look like: + // txn1: read a (n1), b (n2) + // txn1: write a, endtxn (n1), write b (n2) encounters network failure + // txn2: read a, b; contends on locks held by n1 and issues push + // _: the push kicks of recovery of txn1 + // + // txn1: reattempt failed write b (n1) and attempt to finalize transaction + // + // TODO(sarkesian): We currently see these operations, though raising + // amgibuous errors will require updating this schedule of operations. + // txn1->n1: Get(a) + // txn1->n2: Get(b) + // txn1->n1: Put(a), EndTxn(parallel commit) + // txn1->n2: Put(b) -- network failure! + // txn2->n1: Get(a) + // _->n1: PushTxn(txn2->txn1) + // + // txn1->n1: Put(b) -- retry sees new lease start timestamp + // txn1->n1: Refresh(a) + // txn1->n1: Refresh(b) + // _->n1: RecoverTxn(txn1) -- due to the PushTxn seeing txn1 in staging + // txn1->n1: EndTxn(commit) -- results in transaction unexpectedly committed! + getInterceptingTransportFactory := func(nID roachpb.NodeID) kvcoord.TransportFactory { + opID := 0 + return func(options kvcoord.SendOptions, dialer *nodedialer.Dialer, slice kvcoord.ReplicaSlice) (kvcoord.Transport, error) { + transport, err := kvcoord.GRPCTransportFactory(options, dialer, slice) + interceptor := &interceptingTransport{ + Transport: transport, + intercept: func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { + fromNodeID := nID + toNodeID := transport.NextReplica().NodeID + toRangeID := ba.RangeID + toReplicaID := transport.NextReplica().ReplicaID + txnName := "_" + var txnMeta *enginepb.TxnMeta + if ba.Txn != nil { + txnName = ba.Txn.Name + txnMeta = &ba.Txn.TxnMeta + } + jamKey := getJamKey(txnName, fromNodeID, toNodeID, toRangeID) + var jammed bool + var jamChan chan struct{} + var tags strings.Builder + + if jamVal, ok := jammedOps.Load(jamKey); ok { + jamChan = jamVal.(chan struct{}) + } + + if ((txnName == "txn1" || txnName == "txn2") && !ba.IsSingleHeartbeatTxnRequest()) || + ba.IsSingleRecoverTxnRequest() || + (ba.IsSinglePushTxnRequest() && ba.Requests[0].GetPushTxn().PusherTxn.Name == "txn2") { + opID++ + if jamChan != nil { + jammed = true + fmt.Fprintf(&tags, "[jammed %d] ", opID) + } else if (txnName == "txn1" && ba.IsSingleEndTxnRequest()) || ba.IsSingleRecoverTxnRequest() { + fmt.Fprintf(&tags, "[paused %d] ", opID) + } + + fmt.Fprintf(&tags, "(%s) n%d->n%d:r%d/%d ", + txnName, fromNodeID, toNodeID, toRangeID, toReplicaID, + ) + + t.Logf("%sbatchReq={%s}, meta={%s}", tags.String(), ba, txnMeta) + } + + // Ensure that txn1's post-refresh EndTxn occurs after recovery. + if txnName == "txn1" && ba.IsSingleEndTxnRequest() { + close(recoverReady) + <-recoverComplete + t.Logf("%sEndTxn op unpaused", tags.String()) + } + + // Block transaction recovery until ready. + if ba.IsSingleRecoverTxnRequest() { + // Once the RecoverTxn request is issued, as part of txn2's PushTxn + // request, the lease can be moved. + close(leaseMoveReady) + <-recoverReady + t.Logf("%sRecoverTxn op unpaused", tags.String()) + } + + br, rpcErr := transport.SendNext(ctx, ba) + + // Once recovery is completed, signal to allow txn1's EndTxn to be retried. + if ba.IsSingleRecoverTxnRequest() { + t.Logf("RECOVERY: op complete, batchResp={%s}", br) + close(recoverComplete) + } + + if txnName == "txn1" && jammed { + // Start intent query loop to wait on txn1's writes. + txn1MetaCh <- &ba.Txn.TxnMeta + } + + if jammed { + <-jamChan + t.Logf("%sop released", tags.String()) + return nil, grpcstatus.Errorf(codes.Unavailable, "response jammed on n%d<-n%d", fromNodeID, toNodeID) + } + + return br, rpcErr + }, + } + return interceptor, err + } + } + + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + + // Disable closed timestamps for greater control over when transaction gets bumped. + closedts.TargetDuration.Override(ctx, &st.SV, 1*time.Hour) + ts.TimeseriesStorageEnabled.Override(ctx, &st.SV, false) + + tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + Settings: st, + Insecure: true, + }, + ServerArgsPerNode: map[int]base.TestServerArgs{ + 0: { + Settings: st, + Insecure: true, + Knobs: base.TestingKnobs{ + KVClient: &kvcoord.ClientTestingKnobs{ + DisableCommitSanityCheck: true, + TransportFactory: getInterceptingTransportFactory(roachpb.NodeID(1)), + }, + Store: &kvserver.StoreTestingKnobs{ + EvalKnobs: kvserverbase.BatchEvalTestingKnobs{ + RecoverIndeterminateCommitsOnFailedPushes: true, + }, + }, + }, + }, + }, + }) + defer tc.Stopper().Stop(ctx) + + requireRangeLease := func(desc roachpb.RangeDescriptor, serverIdx int) { + t.Helper() + testutils.SucceedsSoon(t, func() error { + li, _, err := tc.FindRangeLeaseEx(ctx, desc, nil) + if err != nil { + return errors.Wrapf(err, "could not find lease for %s", desc) + } + curLease := li.Current() + if curLease.Empty() { + return errors.Errorf("could not find lease for %s", desc) + } + expStoreID := tc.Target(serverIdx).StoreID + if !curLease.OwnedBy(expStoreID) { + return errors.Errorf("expected s%d to own the lease for %s\n"+ + "actual lease info: %s", + expStoreID, desc, curLease) + } + return nil + }) + } + + getInBatch := func(ctx context.Context, txn *kv.Txn, keys ...roachpb.Key) []int64 { + batch := txn.NewBatch() + for _, key := range keys { + batch.GetForUpdate(key) + } + assert.NoError(t, txn.Run(ctx, batch)) + assert.Len(t, batch.Results, len(keys)) + vals := make([]int64, len(keys)) + for i, result := range batch.Results { + assert.Len(t, result.Rows, 1) + vals[i] = result.Rows[0].ValueInt() + } + return vals + } + + db := tc.Server(0).DB() + tablePrefix := bootstrap.TestingUserTableDataMin() + + // Dump KVs at end of test for debugging purposes. + defer func() { + scannedKVs, err := db.Scan(ctx, tablePrefix, tablePrefix.PrefixEnd(), 0) + require.NoError(t, err) + dumpKVs(scannedKVs) + }() + + // Split so we have multiple ranges. + // Corresponds to: + _ = ` + CREATE TABLE bank (id INT PRIMARY KEY, balance INT); + INSERT INTO bank VALUES (1, 50), (2, 50); + ALTER TABLE bank SPLIT AT VALUES (2);` + key1 := roachpb.Key(encoding.EncodeUvarintAscending(tablePrefix.Clone(), 1)) + key2 := roachpb.Key(encoding.EncodeUvarintAscending(tablePrefix.Clone(), 2)) + dbg("Putting initial keys") + require.NoError(t, db.Put(ctx, key1, 50)) + require.NoError(t, db.Put(ctx, key2, 50)) + dbg("Splitting ranges") + tc.SplitRangeOrFatal(t, key1) + firstRange, secondRange := tc.SplitRangeOrFatal(t, key2) + t.Logf("first range: %s", firstRange) + t.Logf("second range: %s", secondRange) + + // Separate the leases for each range so they are not on the same node. + dbg("Moving leases") + tc.TransferRangeLeaseOrFatal(t, firstRange, tc.Target(0)) + requireRangeLease(firstRange, 0) + tc.TransferRangeLeaseOrFatal(t, secondRange, tc.Target(1)) + requireRangeLease(secondRange, 1) + + // Execute implicit transaction to move $10 from account 1 to account 2. + // Corresponds to: + _ = ` + UPDATE bank SET balance = + CASE id + WHEN $1 THEN balance - $3 + WHEN $2 THEN balance + $3 + END + WHERE id IN ($1, $2)` + const xferAmount = 10 + jamTxn1Key := getJamKey("txn1", tc.Server(0).NodeID(), tc.Server(1).NodeID(), secondRange.RangeID) + + // Concurrent transactions. + var wg sync.WaitGroup + wg.Add(4) + go func() { + defer wg.Done() + // Wait until txn1 is ready to start. + select { + case <-txn1Ready: + case <-tc.Stopper().ShouldQuiesce(): + t.Logf("txn1 quiescing...") + } + tCtx := context.Background() + txn := db.NewTxn(tCtx, "txn1") + vals := getInBatch(tCtx, txn, key1, key2) + + // Signal for network jamming to occur. + close(networkManipReady) + <-networkManipComplete + + batch := txn.NewBatch() + batch.Put(key1, vals[0]-xferAmount) + batch.Put(key2, vals[1]+xferAmount) + txn1ResultCh <- txn.CommitInBatch(tCtx, batch) + }() + go func() { + defer wg.Done() + // Wait until txn2 is ready to start. + select { + case <-txn2Ready: + case <-tc.Stopper().ShouldQuiesce(): + t.Logf("txn2 quiescing...") + } + tCtx := context.Background() + txn := db.NewTxn(tCtx, "txn2") + vals := getInBatch(tCtx, txn, key1, key2) + + batch := txn.NewBatch() + batch.Put(key1, vals[0]-xferAmount) + batch.Put(key2, vals[1]+xferAmount) + assert.NoError(t, txn.CommitInBatch(tCtx, batch)) + }() + go func() { + defer wg.Done() + // Wait until lease move is ready. + select { + case <-leaseMoveReady: + case <-tc.Stopper().ShouldQuiesce(): + t.Logf("lease mover quiescing...") + } + + dbg("Transferring r%d lease to n%d", secondRange.RangeID, tc.Target(0).NodeID) + tc.TransferRangeLeaseOrFatal(t, secondRange, tc.Target(0)) + + unjamOperation(jamTxn1Key) + }() + go func() { + defer wg.Done() + // Wait until txn1's TxnMeta is available. + var txnMeta *enginepb.TxnMeta + select { + case txnMeta = <-txn1MetaCh: + case <-tc.Stopper().ShouldQuiesce(): + t.Logf("txn1 querier quiescing...") + } + + if txnMeta == nil { + t.Errorf("cannot check txn1's writes, txnMeta is nil") + return + } + + dbg("starting txn1 record/intent query loop: %v", txnMeta) + startTime := time.Now() + tCtx := context.Background() + if err := testutils.SucceedsSoonError(func() error { + meta := *txnMeta + meta.Sequence = enginepb.TxnSeq(0) + var b kv.Batch + b.Header.Timestamp = txnMeta.WriteTimestamp + b.AddRawRequest(&kvpb.QueryTxnRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: txnMeta.Key, + }, + Txn: meta, + }) + b.AddRawRequest(&kvpb.QueryIntentRequest{ + RequestHeader: kvpb.RequestHeader{ + Key: key2, + }, + Txn: meta, + }) + + if err := db.Run(tCtx, &b); err != nil { + return err + } + + queryTxnResp := b.RawResponse().Responses[0].GetInner().(*kvpb.QueryTxnResponse) + if !queryTxnResp.TxnRecordExists { + return errors.New("no txn1 record found") + } + + queryIntentResp := b.RawResponse().Responses[1].GetInner().(*kvpb.QueryIntentResponse) + if !queryIntentResp.FoundIntent { + return errors.Newf("no txn1 intent found on %s", key2) + } + + dbg("found txn1 record & intent in %s, ifwrites: %v", time.Since(startTime), + queryTxnResp.QueriedTxn.InFlightWrites) + return nil + }); err != nil { + t.Errorf("failed to get a valid record/intent for txn1") + return + } + + // Allow txn2 to start once txn1's write on key2 is in-flight but durable. + close(txn2Ready) + }() + + // Test synchronization. + close(txn1Ready) + + // Jam network connections when ready. + <-networkManipReady + dbg("Manipulating network links") + jamOperation(jamTxn1Key) + close(networkManipComplete) + + // Await concurrent operations and validate results. + wg.Wait() + err := <-txn1ResultCh + + // TODO(sarkesian): While we expect an AmbiguousResultError once the immediate + // changes outlined in #103817 are implemented, right now this is essentially + // validating the existence of the bug. This needs to be fixed, and we should + // expect no assertion failures here. + tErr := (*kvpb.TransactionStatusError)(nil) + require.Truef(t, errors.HasAssertionFailure(err), + "expected AssertionFailedError due to sanity check on transaction already committed") + require.ErrorAsf(t, err, &tErr, + "expected TransactionStatusError due to being already committed") + require.Equalf(t, kvpb.TransactionStatusError_REASON_TXN_COMMITTED, tErr.Reason, + "expected TransactionStatusError due to being already committed") +}