diff --git a/pkg/internal/client/sender.go b/pkg/internal/client/sender.go index a709fe523d9d..3c31d93edacf 100644 --- a/pkg/internal/client/sender.go +++ b/pkg/internal/client/sender.go @@ -266,7 +266,7 @@ func NewMockTransactionalSender( ) (*roachpb.BatchResponse, *roachpb.Error), txn *roachpb.Transaction, ) *MockTransactionalSender { - return &MockTransactionalSender{senderFunc: f, txn: txn.Clone()} + return &MockTransactionalSender{senderFunc: f, txn: *txn} } // Send is part of the TxnSender interface. @@ -356,8 +356,7 @@ func (m *MockTransactionalSender) Epoch() uint32 { panic("unimplemented") } // SerializeTxn is part of the TxnSender interface. func (m *MockTransactionalSender) SerializeTxn() *roachpb.Transaction { - cp := m.txn.Clone() - return &cp + return m.txn.Clone() } // UpdateStateOnRemoteRetryableErr is part of the TxnSender interface. diff --git a/pkg/internal/client/txn_test.go b/pkg/internal/client/txn_test.go index 62c432b62aa8..387bfa05f615 100644 --- a/pkg/internal/client/txn_test.go +++ b/pkg/internal/client/txn_test.go @@ -120,13 +120,12 @@ func newTestTxnFactory( } } if ba.Txn != nil && br.Txn == nil { - txnClone := ba.Txn.Clone() - br.Txn = &txnClone + br.Txn = ba.Txn.Clone() if pErr == nil { br.Txn.Status = status } // Update the MockTxnSender's proto. - *txn = br.Txn.Clone() + *txn = *br.Txn } return br, pErr }) diff --git a/pkg/kv/dist_sender_test.go b/pkg/kv/dist_sender_test.go index dfce9d2f679b..59daf8e79548 100644 --- a/pkg/kv/dist_sender_test.go +++ b/pkg/kv/dist_sender_test.go @@ -489,8 +489,7 @@ func TestImmutableBatchArgs(t *testing.T) { args roachpb.BatchRequest, ) (*roachpb.BatchResponse, error) { reply := args.CreateReply() - txnClone := args.Txn.Clone() - reply.Txn = &txnClone + reply.Txn = args.Txn.Clone() reply.Txn.Timestamp = hlc.MaxTimestamp return reply, nil } diff --git a/pkg/kv/txn_coord_sender.go b/pkg/kv/txn_coord_sender.go index 788c1b83cc96..4e125c522515 100644 --- a/pkg/kv/txn_coord_sender.go +++ b/pkg/kv/txn_coord_sender.go @@ -604,7 +604,7 @@ func (tc *TxnCoordSender) GetMeta( defer tc.mu.Unlock() // Copy mutable state so access is safe for the caller. var meta roachpb.TxnCoordMeta - meta.Txn = tc.mu.txn.Clone() + meta.Txn = tc.mu.txn for _, reqInt := range tc.interceptorStack { reqInt.populateMetaLocked(&meta) } @@ -690,9 +690,8 @@ func generateTxnDeadlineExceededErr( "txn timestamp pushed too much; deadline exceeded by %s (%s > %s), "+ "original timestamp %s ago (%s)", exceededBy, txn.Timestamp, deadline, fromStart, txn.OrigTimestamp) - txnCpy := txn.Clone() return roachpb.NewErrorWithTxn( - roachpb.NewTransactionRetryError(roachpb.RETRY_COMMIT_DEADLINE_EXCEEDED, extraMsg), &txnCpy) + roachpb.NewTransactionRetryError(roachpb.RETRY_COMMIT_DEADLINE_EXCEEDED, extraMsg), txn) } // commitReadOnlyTxnLocked "commits" a read-only txn. It is equivalent, but @@ -790,8 +789,7 @@ func (tc *TxnCoordSender) Send( } // Clone the Txn's Proto so that future modifications can be made without // worrying about synchronization. - newTxn := tc.mu.txn.Clone() - ba.Txn = &newTxn + ba.Txn = tc.mu.txn.Clone() // Send the command through the txnInterceptor stack. br, pErr := tc.interceptorStack[0].SendLocked(ctx, ba) @@ -1239,6 +1237,5 @@ func (tc *TxnCoordSender) Epoch() uint32 { func (tc *TxnCoordSender) SerializeTxn() *roachpb.Transaction { tc.mu.Lock() defer tc.mu.Unlock() - cpy := tc.mu.txn.Clone() - return &cpy + return tc.mu.txn.Clone() } diff --git a/pkg/kv/txn_coord_sender_test.go b/pkg/kv/txn_coord_sender_test.go index 752a0698886f..ac25b2a556d6 100644 --- a/pkg/kv/txn_coord_sender_test.go +++ b/pkg/kv/txn_coord_sender_test.go @@ -914,8 +914,7 @@ func TestTxnCoordSenderNoDuplicateIntents(t *testing.T) { } } br := ba.CreateReply() - txnClone := ba.Txn.Clone() - br.Txn = &txnClone + br.Txn = ba.Txn.Clone() return br, nil } ambient := log.AmbientContext{Tracer: tracing.NewTracer()} @@ -1261,8 +1260,7 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) { _ context.Context, ba roachpb.BatchRequest, ) (*roachpb.BatchResponse, *roachpb.Error) { br := ba.CreateReply() - txn := ba.Txn.Clone() - br.Txn = &txn + br.Txn = ba.Txn.Clone() if _, hasPut := ba.GetArg(roachpb.Put); hasPut { if _, ok := ba.Requests[0].GetInner().(*roachpb.PutRequest); !ok { @@ -1271,8 +1269,7 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) { union := &br.Responses[0] // avoid operating on copy union.MustSetInner(&roachpb.PutResponse{}) if ba.Txn != nil && br.Txn == nil { - txnClone := ba.Txn.Clone() - br.Txn = &txnClone + br.Txn = ba.Txn.Clone() br.Txn.Status = roachpb.PENDING } } else if et, hasET := ba.GetArg(roachpb.EndTransaction); hasET { @@ -1353,8 +1350,7 @@ func (s *mockSender) Send( } // If none of the matchers triggered, just create an empty reply. br := ba.CreateReply() - txn := ba.Txn.Clone() - br.Txn = &txn + br.Txn = ba.Txn.Clone() return br, nil } @@ -1473,8 +1469,7 @@ func TestOnePCErrorTracking(t *testing.T) { resp := ba.CreateReply() // Set the response's txn to the Aborted status (as the server would). This // will make the TxnCoordSender stop the heartbeat loop. - txnCopy := ba.Txn.Clone() - resp.Txn = &txnCopy + resp.Txn = ba.Txn.Clone() resp.Txn.Status = roachpb.ABORTED return resp, nil }) @@ -1937,8 +1932,7 @@ func TestTransactionKeyNotChangedInRestart(t *testing.T) { // Ignore the final EndTxnRequest. if _, ok := ba.GetArg(roachpb.EndTransaction); ok { br := ba.CreateReply() - txn := ba.Txn.Clone() - br.Txn = &txn + br.Txn = ba.Txn.Clone() return nil, nil } @@ -2134,9 +2128,8 @@ func TestTxnRequestTxnTimestamp(t *testing.T) { curReq, req.expRequestTS, ba.Txn.Timestamp) } - txnClone := ba.Txn.Clone() br := ba.CreateReply() - br.Txn = &txnClone + br.Txn = ba.Txn.Clone() br.Txn.Timestamp.Forward(requests[curReq].responseTS) return br, nil }) @@ -2171,9 +2164,8 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) { sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) { if _, ok := ba.GetArg(roachpb.Get); ok { manual.Increment(100) - txnClone := ba.Txn.Clone() br := ba.CreateReply() - br.Txn = &txnClone + br.Txn = ba.Txn.Clone() br.Txn.Timestamp.Forward(clock.Now()) return br, nil } @@ -2310,8 +2302,7 @@ func TestAnchorKey(t *testing.T) { t.Fatalf("expected anchor %q, got %q", key2, ba.Txn.Key) } br := ba.CreateReply() - txn := ba.Txn.Clone() - br.Txn = &txn + br.Txn = ba.Txn.Clone() return br, nil } @@ -2353,8 +2344,8 @@ func TestLeafTxnClientRejectError(t *testing.T) { txn := ba.Txn.Clone() txn.Status = roachpb.ABORTED return roachpb.NewErrorWithTxn( - roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_UNKNOWN), - &txn) + roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_UNKNOWN), txn, + ) } return nil }, diff --git a/pkg/kv/txn_interceptor_committer.go b/pkg/kv/txn_interceptor_committer.go index a67ec5001b56..9eb77edbbe17 100644 --- a/pkg/kv/txn_interceptor_committer.go +++ b/pkg/kv/txn_interceptor_committer.go @@ -131,7 +131,6 @@ func (tc *txnCommitter) closeLocked() {} func cloneWithStatus(txn *roachpb.Transaction, s roachpb.TransactionStatus) *roachpb.Transaction { clone := txn.Clone() - txn = &clone - txn.Status = s - return txn + clone.Status = s + return clone } diff --git a/pkg/kv/txn_interceptor_heartbeater.go b/pkg/kv/txn_interceptor_heartbeater.go index 86e8998c9f27..8dc20341cfcc 100644 --- a/pkg/kv/txn_interceptor_heartbeater.go +++ b/pkg/kv/txn_interceptor_heartbeater.go @@ -381,7 +381,7 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool { } ba := roachpb.BatchRequest{} - ba.Txn = &txn + ba.Txn = txn hb := &roachpb.HeartbeatTxnRequest{ RequestHeader: roachpb.RequestHeader{ @@ -464,7 +464,7 @@ func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) { // Construct a batch with an EndTransaction request. ba := roachpb.BatchRequest{} - ba.Header = roachpb.Header{Txn: &txn} + ba.Header = roachpb.Header{Txn: txn} ba.Add(&roachpb.EndTransactionRequest{ Commit: false, // Resolved intents should maintain an abort span entry to prevent diff --git a/pkg/roachpb/api.go b/pkg/roachpb/api.go index 1d0a6a133110..84cc6cd400b8 100644 --- a/pkg/roachpb/api.go +++ b/pkg/roachpb/api.go @@ -385,8 +385,7 @@ func (h *BatchResponse_Header) combine(o BatchResponse_Header) error { h.Timestamp.Forward(o.Timestamp) if txn := o.Txn; txn != nil { if h.Txn == nil { - txnClone := txn.Clone() - h.Txn = &txnClone + h.Txn = txn.Clone() } else { h.Txn.Update(txn) } diff --git a/pkg/roachpb/batch.go b/pkg/roachpb/batch.go index 38254d025741..c5c0624ae979 100644 --- a/pkg/roachpb/batch.go +++ b/pkg/roachpb/batch.go @@ -83,7 +83,7 @@ func (ba *BatchRequest) UpdateTxn(o *Transaction) { } clonedTxn := ba.Txn.Clone() clonedTxn.Update(o) - ba.Txn = &clonedTxn + ba.Txn = clonedTxn } // IsLeaseRequest returns whether the batch consists of a single RequestLease diff --git a/pkg/roachpb/data.go b/pkg/roachpb/data.go index 64bf1f37ee3b..6bb45cf558bb 100644 --- a/pkg/roachpb/data.go +++ b/pkg/roachpb/data.go @@ -813,19 +813,10 @@ func (t Transaction) LastActive() hlc.Timestamp { return ts } -// Clone creates a copy of the given transaction. The copy is "mostly" deep, -// but does share pieces of memory with the original such as Key, ID and the -// keys with the intent spans. -func (t Transaction) Clone() Transaction { - mt := t.ObservedTimestamps - if mt != nil { - t.ObservedTimestamps = make([]ObservedTimestamp, len(mt)) - copy(t.ObservedTimestamps, mt) - } - // Note that we're not cloning the span keys under the assumption that the - // keys themselves are not mutable. - t.Intents = append([]Span(nil), t.Intents...) - return t +// Clone creates a copy of the given transaction. The copy is shallow because +// none of the references held by a transaction allow interior mutability. +func (t Transaction) Clone() *Transaction { + return &t } // AssertInitialized crashes if the transaction is not initialized. @@ -979,7 +970,7 @@ func (t *Transaction) Update(o *Transaction) { } o.AssertInitialized(context.TODO()) if t.ID == (uuid.UUID{}) { - *t = o.Clone() + *t = *o return } if len(t.Key) == 0 { @@ -1174,7 +1165,7 @@ func PrepareTransactionForRetry( log.Fatalf(ctx, "missing txn for retryable error: %s", pErr) } - txn := pErr.GetTxn().Clone() + txn := *pErr.GetTxn() aborted := false switch tErr := pErr.GetDetail().(type) { case *TransactionAbortedError: @@ -1259,7 +1250,7 @@ func CanTransactionRetryAtRefreshedTimestamp( newTxn.RefreshedTimestamp.Forward(newTxn.Timestamp) newTxn.WriteTooOld = false - return true, &newTxn + return true, newTxn } func readWithinUncertaintyIntervalRetryTimestamp( @@ -1731,7 +1722,8 @@ func (kv KeyValueByKey) Swap(i, j int) { var _ sort.Interface = KeyValueByKey{} -// observedTimestampSlice maintains a sorted list of observed timestamps. +// observedTimestampSlice maintains an immutable sorted list of observed +// timestamps. type observedTimestampSlice []ObservedTimestamp func (s observedTimestampSlice) index(nodeID NodeID) int { @@ -1753,19 +1745,27 @@ func (s observedTimestampSlice) get(nodeID NodeID) (hlc.Timestamp, bool) { } // update the timestamp for the specified node, or add a new entry in the -// correct (sorted) location. +// correct (sorted) location. The receiver is not mutated. func (s observedTimestampSlice) update( nodeID NodeID, timestamp hlc.Timestamp, ) observedTimestampSlice { i := s.index(nodeID) if i < len(s) && s[i].NodeID == nodeID { if timestamp.Less(s[i].Timestamp) { - s[i].Timestamp = timestamp + // The input slice is immutable, so copy and update. + cpy := make(observedTimestampSlice, len(s)) + copy(cpy, s) + cpy[i].Timestamp = timestamp + return cpy } return s } - s = append(s, ObservedTimestamp{}) - copy(s[i+1:], s[i:]) - s[i] = ObservedTimestamp{NodeID: nodeID, Timestamp: timestamp} - return s + // The input slice is immutable, so copy and update. Don't append to + // avoid an allocation. Doing so could invalidate a previous update + // to this receiver. + cpy := make(observedTimestampSlice, len(s)+1) + copy(cpy[:i], s[:i]) + cpy[i] = ObservedTimestamp{NodeID: nodeID, Timestamp: timestamp} + copy(cpy[i+1:], s[i:]) + return cpy } diff --git a/pkg/roachpb/data.pb.go b/pkg/roachpb/data.pb.go index a8c343562d5c..4c553a37e909 100644 --- a/pkg/roachpb/data.pb.go +++ b/pkg/roachpb/data.pb.go @@ -86,7 +86,7 @@ func (x ValueType) String() string { return proto.EnumName(ValueType_name, int32(x)) } func (ValueType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{0} + return fileDescriptor_data_a87162148e6fc734, []int{0} } // ReplicaChangeType is a parameter of ChangeReplicasTrigger. @@ -110,7 +110,7 @@ func (x ReplicaChangeType) String() string { return proto.EnumName(ReplicaChangeType_name, int32(x)) } func (ReplicaChangeType) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{1} + return fileDescriptor_data_a87162148e6fc734, []int{1} } // TransactionStatus specifies possible states for a transaction. @@ -150,7 +150,7 @@ func (x TransactionStatus) String() string { return proto.EnumName(TransactionStatus_name, int32(x)) } func (TransactionStatus) EnumDescriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{2} + return fileDescriptor_data_a87162148e6fc734, []int{2} } // Span is a key range with an inclusive start Key and an exclusive end Key. @@ -169,7 +169,7 @@ type Span struct { func (m *Span) Reset() { *m = Span{} } func (*Span) ProtoMessage() {} func (*Span) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{0} + return fileDescriptor_data_a87162148e6fc734, []int{0} } func (m *Span) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -221,7 +221,7 @@ func (m *Value) Reset() { *m = Value{} } func (m *Value) String() string { return proto.CompactTextString(m) } func (*Value) ProtoMessage() {} func (*Value) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{1} + return fileDescriptor_data_a87162148e6fc734, []int{1} } func (m *Value) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -259,7 +259,7 @@ func (m *KeyValue) Reset() { *m = KeyValue{} } func (m *KeyValue) String() string { return proto.CompactTextString(m) } func (*KeyValue) ProtoMessage() {} func (*KeyValue) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{2} + return fileDescriptor_data_a87162148e6fc734, []int{2} } func (m *KeyValue) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -299,7 +299,7 @@ func (m *StoreIdent) Reset() { *m = StoreIdent{} } func (m *StoreIdent) String() string { return proto.CompactTextString(m) } func (*StoreIdent) ProtoMessage() {} func (*StoreIdent) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{3} + return fileDescriptor_data_a87162148e6fc734, []int{3} } func (m *StoreIdent) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -341,7 +341,7 @@ func (m *SplitTrigger) Reset() { *m = SplitTrigger{} } func (m *SplitTrigger) String() string { return proto.CompactTextString(m) } func (*SplitTrigger) ProtoMessage() {} func (*SplitTrigger) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{4} + return fileDescriptor_data_a87162148e6fc734, []int{4} } func (m *SplitTrigger) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -391,7 +391,7 @@ func (m *MergeTrigger) Reset() { *m = MergeTrigger{} } func (m *MergeTrigger) String() string { return proto.CompactTextString(m) } func (*MergeTrigger) ProtoMessage() {} func (*MergeTrigger) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{5} + return fileDescriptor_data_a87162148e6fc734, []int{5} } func (m *MergeTrigger) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -430,7 +430,7 @@ type ChangeReplicasTrigger struct { func (m *ChangeReplicasTrigger) Reset() { *m = ChangeReplicasTrigger{} } func (*ChangeReplicasTrigger) ProtoMessage() {} func (*ChangeReplicasTrigger) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{6} + return fileDescriptor_data_a87162148e6fc734, []int{6} } func (m *ChangeReplicasTrigger) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -474,7 +474,7 @@ func (m *ModifiedSpanTrigger) Reset() { *m = ModifiedSpanTrigger{} } func (m *ModifiedSpanTrigger) String() string { return proto.CompactTextString(m) } func (*ModifiedSpanTrigger) ProtoMessage() {} func (*ModifiedSpanTrigger) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{7} + return fileDescriptor_data_a87162148e6fc734, []int{7} } func (m *ModifiedSpanTrigger) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -514,7 +514,7 @@ func (m *InternalCommitTrigger) Reset() { *m = InternalCommitTrigger{} } func (m *InternalCommitTrigger) String() string { return proto.CompactTextString(m) } func (*InternalCommitTrigger) ProtoMessage() {} func (*InternalCommitTrigger) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{8} + return fileDescriptor_data_a87162148e6fc734, []int{8} } func (m *InternalCommitTrigger) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -578,7 +578,7 @@ func (m *ObservedTimestamp) Reset() { *m = ObservedTimestamp{} } func (m *ObservedTimestamp) String() string { return proto.CompactTextString(m) } func (*ObservedTimestamp) ProtoMessage() {} func (*ObservedTimestamp) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{9} + return fileDescriptor_data_a87162148e6fc734, []int{9} } func (m *ObservedTimestamp) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -688,8 +688,10 @@ type Transaction struct { // clock, we may add that to the map, which eliminates read uncertainty for // reads on that node. // - // The list of observed timestamps is kept sorted by NodeID. Use - // Transaction.UpdateObservedTimestamp to maintain the sorted order. + // The slice of observed timestamps is kept sorted by NodeID. Use + // Transaction.UpdateObservedTimestamp to maintain the sorted order. The + // slice should be treated as immutable and all updates should be performed + // on a copy of the slice. ObservedTimestamps []ObservedTimestamp `protobuf:"bytes,8,rep,name=observed_timestamps,json=observedTimestamps,proto3" json:"observed_timestamps"` // Writing is true if the transaction has previously sent a Begin transaction // (i.e. if it ever attempted to perform a write, so if it ever attempted to @@ -707,8 +709,10 @@ type Transaction struct { // This bool is set instead of immediately returning a txn retry // error so that intents can continue to be laid down, minimizing // work required on txn restart. - WriteTooOld bool `protobuf:"varint,12,opt,name=write_too_old,json=writeTooOld,proto3" json:"write_too_old,omitempty"` - Intents []Span `protobuf:"bytes,11,rep,name=intents,proto3" json:"intents"` + WriteTooOld bool `protobuf:"varint,12,opt,name=write_too_old,json=writeTooOld,proto3" json:"write_too_old,omitempty"` + // The slice should be treated as immutable and all updates should be + // performed on a copy of the slice. + Intents []Span `protobuf:"bytes,11,rep,name=intents,proto3" json:"intents"` // Epoch zero timestamp is used to keep track of the earliest timestamp // that any epoch of the transaction used. This is set only if the // transaction is restarted and the epoch is bumped. It is used during @@ -727,7 +731,7 @@ type Transaction struct { func (m *Transaction) Reset() { *m = Transaction{} } func (*Transaction) ProtoMessage() {} func (*Transaction) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{10} + return fileDescriptor_data_a87162148e6fc734, []int{10} } func (m *Transaction) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -783,7 +787,7 @@ func (m *TransactionRecord) Reset() { *m = TransactionRecord{} } func (m *TransactionRecord) String() string { return proto.CompactTextString(m) } func (*TransactionRecord) ProtoMessage() {} func (*TransactionRecord) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{11} + return fileDescriptor_data_a87162148e6fc734, []int{11} } func (m *TransactionRecord) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -821,7 +825,7 @@ func (m *Intent) Reset() { *m = Intent{} } func (m *Intent) String() string { return proto.CompactTextString(m) } func (*Intent) ProtoMessage() {} func (*Intent) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{12} + return fileDescriptor_data_a87162148e6fc734, []int{12} } func (m *Intent) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -864,7 +868,7 @@ func (m *SequencedWrite) Reset() { *m = SequencedWrite{} } func (m *SequencedWrite) String() string { return proto.CompactTextString(m) } func (*SequencedWrite) ProtoMessage() {} func (*SequencedWrite) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{13} + return fileDescriptor_data_a87162148e6fc734, []int{13} } func (m *SequencedWrite) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -930,7 +934,7 @@ type Lease struct { func (m *Lease) Reset() { *m = Lease{} } func (*Lease) ProtoMessage() {} func (*Lease) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{14} + return fileDescriptor_data_a87162148e6fc734, []int{14} } func (m *Lease) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -977,7 +981,7 @@ func (m *AbortSpanEntry) Reset() { *m = AbortSpanEntry{} } func (m *AbortSpanEntry) String() string { return proto.CompactTextString(m) } func (*AbortSpanEntry) ProtoMessage() {} func (*AbortSpanEntry) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{15} + return fileDescriptor_data_a87162148e6fc734, []int{15} } func (m *AbortSpanEntry) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -1048,7 +1052,7 @@ func (m *TxnCoordMeta) Reset() { *m = TxnCoordMeta{} } func (m *TxnCoordMeta) String() string { return proto.CompactTextString(m) } func (*TxnCoordMeta) ProtoMessage() {} func (*TxnCoordMeta) Descriptor() ([]byte, []int) { - return fileDescriptor_data_cad45947c9080524, []int{16} + return fileDescriptor_data_a87162148e6fc734, []int{16} } func (m *TxnCoordMeta) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -5762,9 +5766,9 @@ var ( ErrIntOverflowData = fmt.Errorf("proto: integer overflow") ) -func init() { proto.RegisterFile("roachpb/data.proto", fileDescriptor_data_cad45947c9080524) } +func init() { proto.RegisterFile("roachpb/data.proto", fileDescriptor_data_a87162148e6fc734) } -var fileDescriptor_data_cad45947c9080524 = []byte{ +var fileDescriptor_data_a87162148e6fc734 = []byte{ // 1947 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xdc, 0x58, 0xcf, 0x6f, 0xdb, 0xc8, 0xf5, 0x37, 0x45, 0x4a, 0xa2, 0x9e, 0x7e, 0x98, 0x9e, 0xc4, 0x89, 0xbe, 0x59, 0x7c, 0xa5, 0xac, diff --git a/pkg/roachpb/data.proto b/pkg/roachpb/data.proto index 34480f0a4aef..a1674491dc69 100644 --- a/pkg/roachpb/data.proto +++ b/pkg/roachpb/data.proto @@ -327,8 +327,10 @@ message Transaction { // clock, we may add that to the map, which eliminates read uncertainty for // reads on that node. // - // The list of observed timestamps is kept sorted by NodeID. Use - // Transaction.UpdateObservedTimestamp to maintain the sorted order. + // The slice of observed timestamps is kept sorted by NodeID. Use + // Transaction.UpdateObservedTimestamp to maintain the sorted order. The + // slice should be treated as immutable and all updates should be performed + // on a copy of the slice. repeated ObservedTimestamp observed_timestamps = 8 [(gogoproto.nullable) = false]; // Writing is true if the transaction has previously sent a Begin transaction // (i.e. if it ever attempted to perform a write, so if it ever attempted to @@ -347,6 +349,8 @@ message Transaction { // error so that intents can continue to be laid down, minimizing // work required on txn restart. bool write_too_old = 12; + // The slice should be treated as immutable and all updates should be + // performed on a copy of the slice. repeated Span intents = 11 [(gogoproto.nullable) = false]; // Epoch zero timestamp is used to keep track of the earliest timestamp // that any epoch of the transaction used. This is set only if the diff --git a/pkg/roachpb/data_test.go b/pkg/roachpb/data_test.go index f5e4fd5609a7..ba0396dfe757 100644 --- a/pkg/roachpb/data_test.go +++ b/pkg/roachpb/data_test.go @@ -538,7 +538,8 @@ func TestTransactionUpdateEpochZero(t *testing.T) { } func TestTransactionClone(t *testing.T) { - txn := nonZeroTxn.Clone() + txnPtr := nonZeroTxn.Clone() + txn := *txnPtr fields := util.EqualPtrFields(reflect.ValueOf(nonZeroTxn), reflect.ValueOf(txn), "") sort.Strings(fields) @@ -547,8 +548,10 @@ func TestTransactionClone(t *testing.T) { // listed below. If this test fails, please update the list below and/or // Transaction.Clone(). expFields := []string{ + "Intents", "Intents.EndKey", "Intents.Key", + "ObservedTimestamps", "TxnMeta.Key", } if !reflect.DeepEqual(expFields, fields) { diff --git a/pkg/roachpb/errors.go b/pkg/roachpb/errors.go index 9574bf8eaf48..613eec056a2b 100644 --- a/pkg/roachpb/errors.go +++ b/pkg/roachpb/errors.go @@ -169,8 +169,7 @@ func (e *Error) GoError() error { func (e *Error) SetTxn(txn *Transaction) { e.UnexposedTxn = txn if txn != nil { - txnClone := txn.Clone() - e.UnexposedTxn = &txnClone + e.UnexposedTxn = txn.Clone() } if sErr, ok := e.Detail.GetInner().(ErrorDetailInterface); ok { // Refresh the message as the txn is updated. @@ -391,7 +390,7 @@ func (e *TransactionRetryWithProtoRefreshError) PrevTxnAborted() bool { func NewTransactionPushError(pusheeTxn Transaction) *TransactionPushError { // Note: this error will cause a txn restart. The error that the client // receives contains a txn that might have a modified priority. - return &TransactionPushError{PusheeTxn: pusheeTxn.Clone()} + return &TransactionPushError{PusheeTxn: pusheeTxn} } func (e *TransactionPushError) Error() string { @@ -533,7 +532,7 @@ func NewReadWithinUncertaintyIntervalError( if txn != nil { maxTS := txn.MaxTimestamp rwue.MaxTimestamp = &maxTS - rwue.ObservedTimestamps = append([]ObservedTimestamp(nil), txn.ObservedTimestamps...) + rwue.ObservedTimestamps = txn.ObservedTimestamps } return rwue } diff --git a/pkg/sql/as_of_test.go b/pkg/sql/as_of_test.go index 366ac792a5c8..f27b0d41d30d 100644 --- a/pkg/sql/as_of_test.go +++ b/pkg/sql/as_of_test.go @@ -307,7 +307,7 @@ func TestAsOfRetry(t *testing.T) { failureRecord{err, args.Hdr.Txn} txn := args.Hdr.Txn.Clone() txn.Timestamp = txn.Timestamp.Add(0, 1) - return roachpb.NewErrorWithTxn(err, &txn) + return roachpb.NewErrorWithTxn(err, txn) } } } diff --git a/pkg/sql/txn_restart_test.go b/pkg/sql/txn_restart_test.go index 410df9dec491..31911965b039 100644 --- a/pkg/sql/txn_restart_test.go +++ b/pkg/sql/txn_restart_test.go @@ -1479,7 +1479,7 @@ func TestDistSQLRetryableError(t *testing.T) { nil) errTxn := fArgs.Hdr.Txn.Clone() errTxn.UpdateObservedTimestamp(roachpb.NodeID(2), hlc.Timestamp{}) - pErr := roachpb.NewErrorWithTxn(err, &errTxn) + pErr := roachpb.NewErrorWithTxn(err, errTxn) pErr.OriginNode = 2 return pErr } diff --git a/pkg/storage/batcheval/cmd_begin_transaction.go b/pkg/storage/batcheval/cmd_begin_transaction.go index e7758183ed9a..a88345b97d82 100644 --- a/pkg/storage/batcheval/cmd_begin_transaction.go +++ b/pkg/storage/batcheval/cmd_begin_transaction.go @@ -69,8 +69,7 @@ func BeginTransaction( return result.Result{}, err } key := keys.TransactionKey(h.Txn.Key, h.Txn.ID) - clonedTxn := h.Txn.Clone() - reply.Txn = &clonedTxn + reply.Txn = h.Txn.Clone() // Check whether the transaction record already exists. If it already // exists, check its current status and react accordingly. diff --git a/pkg/storage/batcheval/cmd_end_transaction.go b/pkg/storage/batcheval/cmd_end_transaction.go index 00dbb36f2bd6..e340bac373e6 100644 --- a/pkg/storage/batcheval/cmd_end_transaction.go +++ b/pkg/storage/batcheval/cmd_end_transaction.go @@ -179,8 +179,7 @@ func evalEndTransaction( } else if !ok { // No existing transaction record was found - create one by writing it // below in updateTxnWithExternalIntents. - txn := h.Txn.Clone() - reply.Txn = &txn + reply.Txn = h.Txn.Clone() // Verify that it is safe to create the transaction record. We only need // to perform this verification for commits. Rollbacks can always write diff --git a/pkg/storage/batcheval/cmd_heartbeat_txn.go b/pkg/storage/batcheval/cmd_heartbeat_txn.go index 141bc0079ca5..b22b4fc3f5fe 100644 --- a/pkg/storage/batcheval/cmd_heartbeat_txn.go +++ b/pkg/storage/batcheval/cmd_heartbeat_txn.go @@ -64,7 +64,7 @@ func HeartbeatTxn( } else if !ok { // No existing transaction record was found - create one by writing // it below. - txn = h.Txn.Clone() + txn = *h.Txn if txn.Status != roachpb.PENDING { return result.Result{}, roachpb.NewTransactionStatusError( fmt.Sprintf("cannot heartbeat txn with status %v: %s", txn.Status, txn), diff --git a/pkg/storage/batcheval/cmd_push_txn.go b/pkg/storage/batcheval/cmd_push_txn.go index 03bdbbb57b01..fb1bb11d62ef 100644 --- a/pkg/storage/batcheval/cmd_push_txn.go +++ b/pkg/storage/batcheval/cmd_push_txn.go @@ -126,8 +126,8 @@ func PushTxn( key := keys.TransactionKey(args.PusheeTxn.Key, args.PusheeTxn.ID) // Fetch existing transaction; if missing, we're allowed to abort. - existTxn := &roachpb.Transaction{} - ok, err := engine.MVCCGetProto(ctx, batch, key, hlc.Timestamp{}, existTxn, engine.MVCCGetOptions{}) + var existTxn roachpb.Transaction + ok, err := engine.MVCCGetProto(ctx, batch, key, hlc.Timestamp{}, &existTxn, engine.MVCCGetOptions{}) if err != nil { return result.Result{}, err } else if !ok { @@ -173,7 +173,7 @@ func PushTxn( } } else { // Start with the persisted transaction record. - reply.PusheeTxn = existTxn.Clone() + reply.PusheeTxn = existTxn // Forward the last heartbeat time of the transaction record by // the timestamp of the intent. This is another indication of diff --git a/pkg/storage/batcheval/cmd_query_intent.go b/pkg/storage/batcheval/cmd_query_intent.go index 58485b566ac9..ab0913ffc897 100644 --- a/pkg/storage/batcheval/cmd_query_intent.go +++ b/pkg/storage/batcheval/cmd_query_intent.go @@ -78,8 +78,7 @@ func QueryIntent( // If the request was querying an intent in its own transaction, update // the response transaction. if ownTxn { - clonedTxn := h.Txn.Clone() - reply.Txn = &clonedTxn + reply.Txn = h.Txn.Clone() reply.Txn.Timestamp.Forward(intent.Txn.Timestamp) } } diff --git a/pkg/storage/client_test.go b/pkg/storage/client_test.go index 99660e09c08e..e822d31b62f0 100644 --- a/pkg/storage/client_test.go +++ b/pkg/storage/client_test.go @@ -544,8 +544,7 @@ func (t *multiTestContextKVTransport) SendNext( // Clone txn of ba args for sending. ba.Replica = rep.ReplicaDescriptor if txn := ba.Txn; txn != nil { - txnClone := ba.Txn.Clone() - ba.Txn = &txnClone + ba.Txn = ba.Txn.Clone() } var br *roachpb.BatchResponse var pErr *roachpb.Error diff --git a/pkg/storage/engine/mvcc.go b/pkg/storage/engine/mvcc.go index 7e9b28041d73..9e3044feff08 100644 --- a/pkg/storage/engine/mvcc.go +++ b/pkg/storage/engine/mvcc.go @@ -1872,7 +1872,7 @@ func MVCCDeleteRange( if txn != nil { prevSeqTxn := txn.Clone() prevSeqTxn.Sequence-- - scanTxn = &prevSeqTxn + scanTxn = prevSeqTxn } kvs, resumeSpan, _, err := MVCCScan( ctx, engine, key, endKey, max, scanTs, MVCCScanOptions{Txn: scanTxn}) diff --git a/pkg/storage/engine/mvcc_stats_test.go b/pkg/storage/engine/mvcc_stats_test.go index 37915ba982c5..2f3f945e8a56 100644 --- a/pkg/storage/engine/mvcc_stats_test.go +++ b/pkg/storage/engine/mvcc_stats_test.go @@ -584,11 +584,13 @@ func TestMVCCStatsDelDelCommitMovesTimestamp(t *testing.T) { aggMS := *aggMS engine := engine.NewBatch() defer engine.Close() - txn := txn.Clone() - txn.Status = roachpb.COMMITTED - txn.Timestamp.Forward(ts3) - if err := MVCCResolveWriteIntent(ctx, engine, &aggMS, roachpb.Intent{Span: roachpb.Span{Key: key}, Status: txn.Status, Txn: txn.TxnMeta}); err != nil { + txnCommit := txn.Clone() + txnCommit.Status = roachpb.COMMITTED + txnCommit.Timestamp.Forward(ts3) + if err := MVCCResolveWriteIntent(ctx, engine, &aggMS, roachpb.Intent{ + Span: roachpb.Span{Key: key}, Status: txnCommit.Status, Txn: txnCommit.TxnMeta, + }); err != nil { t.Fatal(err) } @@ -612,12 +614,11 @@ func TestMVCCStatsDelDelCommitMovesTimestamp(t *testing.T) { engine := engine.NewBatch() defer engine.Close() - txn := txn.Clone() - - txn.Status = roachpb.ABORTED - txn.Timestamp.Forward(ts3) + txnAbort := txn.Clone() + txnAbort.Status = roachpb.ABORTED + txnAbort.Timestamp.Forward(ts3) if err := MVCCResolveWriteIntent(ctx, engine, &aggMS, roachpb.Intent{ - Span: roachpb.Span{Key: key}, Status: txn.Status, Txn: txn.TxnMeta, + Span: roachpb.Span{Key: key}, Status: txnAbort.Status, Txn: txnAbort.TxnMeta, }); err != nil { t.Fatal(err) } @@ -739,11 +740,11 @@ func TestMVCCStatsPutDelPutMovesTimestamp(t *testing.T) { aggMS := *aggMS engine := engine.NewBatch() defer engine.Close() - txn := txn.Clone() - txn.Status = roachpb.ABORTED // doesn't change m2ValSize, fortunately + txnAbort := txn.Clone() + txnAbort.Status = roachpb.ABORTED // doesn't change m2ValSize, fortunately if err := MVCCResolveWriteIntent(ctx, engine, &aggMS, roachpb.Intent{ - Span: roachpb.Span{Key: key}, Status: txn.Status, Txn: txn.TxnMeta, + Span: roachpb.Span{Key: key}, Status: txnAbort.Status, Txn: txnAbort.TxnMeta, }); err != nil { t.Fatal(err) } diff --git a/pkg/storage/engine/mvcc_test.go b/pkg/storage/engine/mvcc_test.go index 9720a2eb3415..ccdc8b6cfded 100644 --- a/pkg/storage/engine/mvcc_test.go +++ b/pkg/storage/engine/mvcc_test.go @@ -90,7 +90,7 @@ func makeTxn(baseTxn roachpb.Transaction, ts hlc.Timestamp) *roachpb.Transaction txn := baseTxn.Clone() txn.OrigTimestamp = ts txn.Timestamp = ts - return &txn + return txn } type mvccKeys []MVCCKey @@ -3049,7 +3049,7 @@ func TestMVCCResolveIntentTxnTimestampMismatch(t *testing.T) { txn.TxnMeta.Timestamp.Forward(tsEarly.Add(10, 0)) // Write an intent which has txn.Timestamp > meta.timestamp. - if err := MVCCPut(ctx, engine, nil, testKey1, tsEarly, value1, &txn); err != nil { + if err := MVCCPut(ctx, engine, nil, testKey1, tsEarly, value1, txn); err != nil { t.Fatal(err) } diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 4cb52d9e12de..b4601a143630 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -1312,14 +1312,14 @@ func (r *Replica) limitTxnMaxTimestamp( } if obsTS.Less(ba.Txn.MaxTimestamp) { // Copy-on-write to protect others we might be sharing the Txn with. - shallowTxn := *ba.Txn + txnClone := ba.Txn.Clone() // The uncertainty window is [OrigTimestamp, maxTS), so if that window // is empty, there won't be any uncertainty restarts. if !ba.Txn.OrigTimestamp.Less(obsTS) { log.Event(ctx, "read has no clock uncertainty") } - shallowTxn.MaxTimestamp.Backward(obsTS) - ba.Txn = &shallowTxn + txnClone.MaxTimestamp.Backward(obsTS) + ba.Txn = txnClone } } @@ -1567,7 +1567,7 @@ func checkIfTxnAborted( } newTxn.Status = roachpb.ABORTED return roachpb.NewErrorWithTxn( - roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_ABORT_SPAN), &newTxn) + roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_ABORT_SPAN), newTxn) } return nil } diff --git a/pkg/storage/replica_evaluate.go b/pkg/storage/replica_evaluate.go index 77c1302fb678..1eb5d38f532c 100644 --- a/pkg/storage/replica_evaluate.go +++ b/pkg/storage/replica_evaluate.go @@ -162,14 +162,10 @@ func evaluateBatch( ba.Requests = optimizePuts(batch, ba.Requests, ba.Header.DistinctSpans) } - // Create a shallow clone of the transaction to store the new txn - // state produced on the return/error path. We use a shallow clone - // because we only modify a few non-pointer fields (Sequence, - // DeprecatedBatchIndex, WriteTooOld, Timestamp): a shallow clone saves a - // few allocs. + // Create a clone of the transaction to store the new txn state produced on + // the return/error path. if ba.Txn != nil { - txnShallow := *ba.Txn - ba.Txn = &txnShallow + ba.Txn = ba.Txn.Clone() // Check whether this transaction has been aborted, if applicable. // This applies to writes that leave intents (the use of the diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index 14e2d76bc745..6957b3756e1a 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -3994,9 +3994,9 @@ func TestBatchRetryCantCommitIntents(t *testing.T) { var ba2 roachpb.BatchRequest putB := putArgs(keyB, []byte("value")) putTxn := br.Txn.Clone() - ba2.Header = roachpb.Header{Txn: &putTxn} + ba2.Header = roachpb.Header{Txn: putTxn} ba2.Add(&putB) - assignSeqNumsForReqs(&putTxn, &putB) + assignSeqNumsForReqs(putTxn, &putB) br, pErr = tc.Sender().Send(context.Background(), ba2) if pErr != nil { t.Fatalf("unexpected error: %s", pErr) @@ -4004,16 +4004,16 @@ func TestBatchRetryCantCommitIntents(t *testing.T) { // HeartbeatTxn. hbTxn := br.Txn.Clone() - hb, hbH := heartbeatArgs(&hbTxn, tc.Clock().Now()) + hb, hbH := heartbeatArgs(hbTxn, tc.Clock().Now()) if _, pErr := tc.SendWrappedWith(hbH, &hb); pErr != nil { t.Fatalf("unexpected error: %s", pErr) } // EndTransaction. etTxn := br.Txn.Clone() - et, etH := endTxnArgs(&etTxn, true) + et, etH := endTxnArgs(etTxn, true) et.IntentSpans = []roachpb.Span{{Key: key, EndKey: nil}, {Key: keyB, EndKey: nil}} - assignSeqNumsForReqs(&etTxn, &et) + assignSeqNumsForReqs(etTxn, &et) if _, pErr := tc.SendWrappedWith(etH, &et); pErr != nil { t.Fatalf("unexpected error: %s", pErr) } @@ -4043,7 +4043,7 @@ func TestBatchRetryCantCommitIntents(t *testing.T) { // Send a put for keyB; this currently succeeds as there's nothing to detect // the retry. - if _, pErr = tc.SendWrappedWith(roachpb.Header{Txn: &putTxn}, &putB); pErr != nil { + if _, pErr = tc.SendWrappedWith(roachpb.Header{Txn: putTxn}, &putB); pErr != nil { t.Error(pErr) } @@ -4699,7 +4699,7 @@ func TestAbortSpanError(t *testing.T) { expected.Timestamp = txn.Timestamp expected.Priority = priority expected.Status = roachpb.ABORTED - if pErr.GetTxn() == nil || !reflect.DeepEqual(pErr.GetTxn(), &expected) { + if pErr.GetTxn() == nil || !reflect.DeepEqual(pErr.GetTxn(), expected) { t.Errorf("txn does not match: %s vs. %s", pErr.GetTxn(), expected) } } else { @@ -5025,7 +5025,7 @@ func TestResolveIntentPushTxnReplyTxn(t *testing.T) { txn := newTransaction("test", roachpb.Key("test"), 1, tc.Clock()) txnPushee := txn.Clone() - pa := pushTxnArgs(txn, &txnPushee, roachpb.PUSH_ABORT) + pa := pushTxnArgs(txn, txnPushee, roachpb.PUSH_ABORT) pa.Force = true var ms enginepb.MVCCStats var ra roachpb.ResolveIntentRequest @@ -7985,10 +7985,10 @@ func TestReplicaTimestampCacheBumpNotLost(t *testing.T) { t.Fatal(pErr) } - if !reflect.DeepEqual(&origTxn, txn) { + if !reflect.DeepEqual(origTxn, txn) { t.Fatalf( "original transaction proto was mutated: %s", - pretty.Diff(&origTxn, txn), + pretty.Diff(origTxn, txn), ) } if resp.Txn == nil { @@ -8033,8 +8033,8 @@ func TestReplicaEvaluationNotTxnMutation(t *testing.T) { if pErr != nil { t.Fatal(pErr) } - if !reflect.DeepEqual(&origTxn, txn) { - t.Fatalf("transaction was mutated during evaluation: %s", pretty.Diff(&origTxn, txn)) + if !reflect.DeepEqual(origTxn, txn) { + t.Fatalf("transaction was mutated during evaluation: %s", pretty.Diff(origTxn, txn)) } } @@ -10017,7 +10017,7 @@ func TestCreateTxnRecord(t *testing.T) { run: func(txn *roachpb.Transaction, now hlc.Timestamp) error { clone := txn.Clone() clone.Restart(-1, 0, now) - bt, btH := beginTxnArgs(clone.Key, &clone) + bt, btH := beginTxnArgs(clone.Key, clone) return sendWrappedWithErr(btH, &bt) }, expTxn: func(txn *roachpb.Transaction, now hlc.Timestamp) roachpb.TransactionRecord { @@ -10046,7 +10046,7 @@ func TestCreateTxnRecord(t *testing.T) { run: func(txn *roachpb.Transaction, now hlc.Timestamp) error { clone := txn.Clone() clone.Restart(-1, 0, now) - bt, btH := beginTxnArgs(clone.Key, &clone) + bt, btH := beginTxnArgs(clone.Key, clone) return sendWrappedWithErr(btH, &bt) }, expTxn: func(txn *roachpb.Transaction, now hlc.Timestamp) roachpb.TransactionRecord { @@ -10239,7 +10239,7 @@ func TestCreateTxnRecord(t *testing.T) { // timestamp. clone := txn.Clone() clone.Restart(-1, 0, now.Add(0, 1)) - hb, hbH := heartbeatArgs(&clone, now) + hb, hbH := heartbeatArgs(clone, now) return sendWrappedWithErr(hbH, &hb) }, expError: "TransactionAbortedError(ABORT_REASON_ALREADY_COMMITTED_OR_ROLLED_BACK_POSSIBLE_REPLAY)", @@ -10514,7 +10514,7 @@ func TestCreateTxnRecord(t *testing.T) { // timestamp. clone := txn.Clone() clone.Restart(-1, 0, now.Add(0, 1)) - hb, hbH := heartbeatArgs(&clone, now) + hb, hbH := heartbeatArgs(clone, now) return sendWrappedWithErr(hbH, &hb) }, expError: "TransactionAbortedError(ABORT_REASON_ABORTED_RECORD_FOUND)", diff --git a/pkg/storage/replica_tscache.go b/pkg/storage/replica_tscache.go index 577d712e2286..7afcb2e16339 100644 --- a/pkg/storage/replica_tscache.go +++ b/pkg/storage/replica_tscache.go @@ -218,7 +218,7 @@ func (r *Replica) applyTimestampCache( if ba.Txn.Timestamp.Less(nextTS) { txn := ba.Txn.Clone() bumped = txn.Timestamp.Forward(nextTS) || bumped - ba.Txn = &txn + ba.Txn = txn } } } else { @@ -236,7 +236,7 @@ func (r *Replica) applyTimestampCache( txn := ba.Txn.Clone() bumped = txn.Timestamp.Forward(wTS.Next()) || bumped txn.WriteTooOld = true - ba.Txn = &txn + ba.Txn = txn } } } else { diff --git a/pkg/storage/replica_write.go b/pkg/storage/replica_write.go index 32e293dae04e..bb3656ffa600 100644 --- a/pkg/storage/replica_write.go +++ b/pkg/storage/replica_write.go @@ -312,7 +312,7 @@ func (r *Replica) evaluateWriteBatch( ms = enginepb.MVCCStats{} } else { // Run commit trigger manually. - innerResult, err := batcheval.RunCommitTrigger(ctx, rec, batch, &ms, *etArg, &clonedTxn) + innerResult, err := batcheval.RunCommitTrigger(ctx, rec, batch, &ms, *etArg, clonedTxn) if err != nil { return batch, ms, br, res, roachpb.NewErrorf("failed to run commit trigger: %s", err) } @@ -321,7 +321,7 @@ func (r *Replica) evaluateWriteBatch( } } - br.Txn = &clonedTxn + br.Txn = clonedTxn // Add placeholder responses for begin & end transaction requests. var resps []roachpb.ResponseUnion if hasBegin { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 63bcd1a7aaa4..5492db718767 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -2833,9 +2833,9 @@ func (s *Store) Send( // restart (at least in the absence of a prior observed timestamp from // this node, in which case the following is a no-op). if _, ok := ba.Txn.GetObservedTimestamp(ba.Replica.NodeID); !ok { - shallowTxn := *ba.Txn - shallowTxn.UpdateObservedTimestamp(ba.Replica.NodeID, now) - ba.Txn = &shallowTxn + txnClone := ba.Txn.Clone() + txnClone.UpdateObservedTimestamp(ba.Replica.NodeID, now) + ba.Txn = txnClone } } @@ -2964,8 +2964,7 @@ func (s *Store) Send( // // See #9130. if h.Txn != nil { - clonedTxn := h.Txn.Clone() - h.Txn = &clonedTxn + h.Txn = h.Txn.Clone() } // Handle the case where we get more than one write intent error; // we need to cleanup the previous attempt to handle it to allow diff --git a/pkg/storage/txnwait/txnqueue.go b/pkg/storage/txnwait/txnqueue.go index 2e4599acbdd7..33aeef9900e7 100644 --- a/pkg/storage/txnwait/txnqueue.go +++ b/pkg/storage/txnwait/txnqueue.go @@ -78,7 +78,7 @@ func IsExpired(now hlc.Timestamp, txn *roachpb.Transaction) bool { // copy of the supplied transaction. It is necessary to fully copy // each field in the transaction to avoid race conditions. func createPushTxnResponse(txn *roachpb.Transaction) *roachpb.PushTxnResponse { - return &roachpb.PushTxnResponse{PusheeTxn: txn.Clone()} + return &roachpb.PushTxnResponse{PusheeTxn: *txn} } // A waitingPush represents a PushTxn command that is waiting on the @@ -740,7 +740,7 @@ func (q *Queue) startQueryPusherTxn( // Send an update of the pusher txn. pusher.Update(updatedPusher) - ch <- &pusher + ch <- pusher // Wait for context cancellation or indication on readyCh that the // push waiter requires another query of the pusher txn.