diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index a87af5089bab..9e2ca65a9c79 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -3341,11 +3341,16 @@ func isOnePhaseCommit(ba roachpb.BatchRequest) bool { // range of keys being written is empty. If so, then the run can be // set to put "blindly", meaning no iterator need be used to read // existing values during the MVCC write. -func optimizePuts(batch engine.ReadWriter, reqs []roachpb.RequestUnion, distinctSpans bool) { +// The caller should use the returned slice (which is either equal to +// the input slice, or has been shallow-copied appropriately to avoid +// mutating the original requests). +func optimizePuts( + batch engine.ReadWriter, origReqs []roachpb.RequestUnion, distinctSpans bool, +) []roachpb.RequestUnion { var minKey, maxKey roachpb.Key var unique map[string]struct{} if !distinctSpans { - unique = make(map[string]struct{}, len(reqs)) + unique = make(map[string]struct{}, len(origReqs)) } // Returns false on occurrence of a duplicate key. maybeAddPut := func(key roachpb.Key) bool { @@ -3365,7 +3370,8 @@ func optimizePuts(batch engine.ReadWriter, reqs []roachpb.RequestUnion, distinct return true } - for i, r := range reqs { + firstUnoptimizedIndex := len(origReqs) + for i, r := range origReqs { switch t := r.GetInner().(type) { case *roachpb.PutRequest: if maybeAddPut(t.Key) { @@ -3376,12 +3382,12 @@ func optimizePuts(batch engine.ReadWriter, reqs []roachpb.RequestUnion, distinct continue } } - reqs = reqs[:i] + firstUnoptimizedIndex = i break } - if len(reqs) < optimizePutThreshold { // don't bother if below this threshold - return + if firstUnoptimizedIndex < optimizePutThreshold { // don't bother if below this threshold + return origReqs } iter := batch.NewIterator(false /* total order iterator */) defer iter.Close() @@ -3397,21 +3403,27 @@ func optimizePuts(batch engine.ReadWriter, reqs []roachpb.RequestUnion, distinct } // Set the prefix of the run which is being written to virgin // keyspace to "blindly" put values. - for _, r := range reqs { - if iterKey == nil || bytes.Compare(iterKey, r.GetInner().Header().Key) > 0 { - switch t := r.GetInner().(type) { + reqs := append([]roachpb.RequestUnion(nil), origReqs...) + for i := range reqs[:firstUnoptimizedIndex] { + inner := reqs[i].GetInner() + if iterKey == nil || bytes.Compare(iterKey, inner.Header().Key) > 0 { + switch t := inner.(type) { case *roachpb.PutRequest: - t.Blind = true + shallow := *t + shallow.Blind = true + reqs[i].MustSetInner(&shallow) case *roachpb.ConditionalPutRequest: - t.Blind = true + shallow := *t + shallow.Blind = true + reqs[i].MustSetInner(&shallow) default: log.Fatalf(context.TODO(), "unexpected non-put request: %s", t) } } } + return reqs } -// TODO(tschottdorf): Reliance on mutating `ba.Txn` should be dealt with. func (r *Replica) executeBatch( ctx context.Context, idKey storagebase.CmdIDKey, @@ -3437,7 +3449,7 @@ func (r *Replica) executeBatch( // Optimize any contiguous sequences of put and conditional put ops. if len(ba.Requests) >= optimizePutThreshold { - optimizePuts(batch, ba.Requests, ba.Header.DistinctSpans) + ba.Requests = optimizePuts(batch, ba.Requests, ba.Header.DistinctSpans) } // Update the node clock with the serviced request. This maintains a high @@ -3450,6 +3462,14 @@ func (r *Replica) executeBatch( return nil, ProposalData{}, roachpb.NewErrorWithTxn(err, ba.Header.Txn) } + // Create a shallow clone of the transaction. We only modify a few + // non-pointer fields (BatchIndex, WriteTooOld, Timestamp), so this saves + // a few allocs. + if ba.Txn != nil { + txnShallow := *ba.Txn + ba.Txn = &txnShallow + } + var pd ProposalData for index, union := range ba.Requests { // Execute the command. diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 569b73b31ef2..5e3bc0b6305a 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -1501,7 +1501,24 @@ func TestOptimizePuts(t *testing.T) { for _, r := range c.reqs { batch.Add(r) } - optimizePuts(tc.engine, batch.Requests, false) + // Make a deep clone of the requests slice. We need a deep clone + // because the regression which is prevented here changed data on the + // individual requests, and not the slice. + goldenRequests := append([]roachpb.RequestUnion(nil), batch.Requests...) + for i := range goldenRequests { + clone := protoutil.Clone(goldenRequests[i].GetInner()).(roachpb.Request) + goldenRequests[i].MustSetInner(clone) + } + // Save the original slice, allowing us to assert that it doesn't + // change when it is passed to optimizePuts. + oldRequests := batch.Requests + batch.Requests = optimizePuts(tc.engine, batch.Requests, false) + if !reflect.DeepEqual(goldenRequests, oldRequests) { + t.Fatalf("%d: optimizePuts mutated the original request slice: %s", + i, pretty.Diff(goldenRequests, oldRequests), + ) + } + blind := []bool{} for _, r := range batch.Requests { switch t := r.GetInner().(type) { @@ -6301,3 +6318,37 @@ func TestReplicaTimestampCacheBumpNotLost(t *testing.T) { ) } } + +func TestReplicaEvaluationNotTxnMutation(t *testing.T) { + defer leaktest.AfterTest(t)() + + tc := testContext{} + tc.Start(t) + defer tc.Stop() + + ctx := tc.rng.AnnotateCtx(context.TODO()) + key := keys.LocalMax + + txn := newTransaction("test", key, 1, enginepb.SERIALIZABLE, tc.Clock()) + origTxn := txn.Clone() + + var ba roachpb.BatchRequest + ba.Txn = txn + ba.Timestamp = txn.Timestamp + txnPut := putArgs(key, []byte("foo")) + // Add two puts (the second one gets BatchIndex 1, which was a failure mode + // observed when this test was written and the failure fixed). Originally + // observed in #10137, where this became relevant (before that, evaluation + // happened downstream of Raft, so a serialization pass always took place). + ba.Add(&txnPut) + ba.Add(&txnPut) + + batch, _, _, _, pErr := tc.rng.executeWriteBatch(ctx, makeIDKey(), ba) + defer batch.Close() + if pErr != nil { + t.Fatal(pErr) + } + if !reflect.DeepEqual(&origTxn, txn) { + t.Fatalf("transaction was mutated during evaluation: %s", pretty.Diff(&origTxn, txn)) + } +}