Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-19.1: roachpb: refine Transaction proto cloning #36114

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func NewMockTransactionalSender(
) (*roachpb.BatchResponse, *roachpb.Error),
txn *roachpb.Transaction,
) *MockTransactionalSender {
return &MockTransactionalSender{senderFunc: f, txn: txn.Clone()}
return &MockTransactionalSender{senderFunc: f, txn: *txn}
}

// Send is part of the TxnSender interface.
Expand Down Expand Up @@ -356,8 +356,7 @@ func (m *MockTransactionalSender) Epoch() uint32 { panic("unimplemented") }

// SerializeTxn is part of the TxnSender interface.
func (m *MockTransactionalSender) SerializeTxn() *roachpb.Transaction {
cp := m.txn.Clone()
return &cp
return m.txn.Clone()
}

// UpdateStateOnRemoteRetryableErr is part of the TxnSender interface.
Expand Down
5 changes: 2 additions & 3 deletions pkg/internal/client/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,12 @@ func newTestTxnFactory(
}
}
if ba.Txn != nil && br.Txn == nil {
txnClone := ba.Txn.Clone()
br.Txn = &txnClone
br.Txn = ba.Txn.Clone()
if pErr == nil {
br.Txn.Status = status
}
// Update the MockTxnSender's proto.
*txn = br.Txn.Clone()
*txn = *br.Txn
}
return br, pErr
})
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,7 @@ func TestImmutableBatchArgs(t *testing.T) {
args roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
reply := args.CreateReply()
txnClone := args.Txn.Clone()
reply.Txn = &txnClone
reply.Txn = args.Txn.Clone()
reply.Txn.Timestamp = hlc.MaxTimestamp
return reply, nil
}
Expand Down
11 changes: 4 additions & 7 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ func (tc *TxnCoordSender) GetMeta(
defer tc.mu.Unlock()
// Copy mutable state so access is safe for the caller.
var meta roachpb.TxnCoordMeta
meta.Txn = tc.mu.txn.Clone()
meta.Txn = tc.mu.txn
for _, reqInt := range tc.interceptorStack {
reqInt.populateMetaLocked(&meta)
}
Expand Down Expand Up @@ -690,9 +690,8 @@ func generateTxnDeadlineExceededErr(
"txn timestamp pushed too much; deadline exceeded by %s (%s > %s), "+
"original timestamp %s ago (%s)",
exceededBy, txn.Timestamp, deadline, fromStart, txn.OrigTimestamp)
txnCpy := txn.Clone()
return roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_COMMIT_DEADLINE_EXCEEDED, extraMsg), &txnCpy)
roachpb.NewTransactionRetryError(roachpb.RETRY_COMMIT_DEADLINE_EXCEEDED, extraMsg), txn)
}

// commitReadOnlyTxnLocked "commits" a read-only txn. It is equivalent, but
Expand Down Expand Up @@ -790,8 +789,7 @@ func (tc *TxnCoordSender) Send(
}
// Clone the Txn's Proto so that future modifications can be made without
// worrying about synchronization.
newTxn := tc.mu.txn.Clone()
ba.Txn = &newTxn
ba.Txn = tc.mu.txn.Clone()

// Send the command through the txnInterceptor stack.
br, pErr := tc.interceptorStack[0].SendLocked(ctx, ba)
Expand Down Expand Up @@ -1239,6 +1237,5 @@ func (tc *TxnCoordSender) Epoch() uint32 {
func (tc *TxnCoordSender) SerializeTxn() *roachpb.Transaction {
tc.mu.Lock()
defer tc.mu.Unlock()
cpy := tc.mu.txn.Clone()
return &cpy
return tc.mu.txn.Clone()
}
31 changes: 11 additions & 20 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,8 +914,7 @@ func TestTxnCoordSenderNoDuplicateIntents(t *testing.T) {
}
}
br := ba.CreateReply()
txnClone := ba.Txn.Clone()
br.Txn = &txnClone
br.Txn = ba.Txn.Clone()
return br, nil
}
ambient := log.AmbientContext{Tracer: tracing.NewTracer()}
Expand Down Expand Up @@ -1261,8 +1260,7 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) {
_ context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
br := ba.CreateReply()
txn := ba.Txn.Clone()
br.Txn = &txn
br.Txn = ba.Txn.Clone()

if _, hasPut := ba.GetArg(roachpb.Put); hasPut {
if _, ok := ba.Requests[0].GetInner().(*roachpb.PutRequest); !ok {
Expand All @@ -1271,8 +1269,7 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) {
union := &br.Responses[0] // avoid operating on copy
union.MustSetInner(&roachpb.PutResponse{})
if ba.Txn != nil && br.Txn == nil {
txnClone := ba.Txn.Clone()
br.Txn = &txnClone
br.Txn = ba.Txn.Clone()
br.Txn.Status = roachpb.PENDING
}
} else if et, hasET := ba.GetArg(roachpb.EndTransaction); hasET {
Expand Down Expand Up @@ -1353,8 +1350,7 @@ func (s *mockSender) Send(
}
// If none of the matchers triggered, just create an empty reply.
br := ba.CreateReply()
txn := ba.Txn.Clone()
br.Txn = &txn
br.Txn = ba.Txn.Clone()
return br, nil
}

Expand Down Expand Up @@ -1473,8 +1469,7 @@ func TestOnePCErrorTracking(t *testing.T) {
resp := ba.CreateReply()
// Set the response's txn to the Aborted status (as the server would). This
// will make the TxnCoordSender stop the heartbeat loop.
txnCopy := ba.Txn.Clone()
resp.Txn = &txnCopy
resp.Txn = ba.Txn.Clone()
resp.Txn.Status = roachpb.ABORTED
return resp, nil
})
Expand Down Expand Up @@ -1937,8 +1932,7 @@ func TestTransactionKeyNotChangedInRestart(t *testing.T) {
// Ignore the final EndTxnRequest.
if _, ok := ba.GetArg(roachpb.EndTransaction); ok {
br := ba.CreateReply()
txn := ba.Txn.Clone()
br.Txn = &txn
br.Txn = ba.Txn.Clone()
return nil, nil
}

Expand Down Expand Up @@ -2134,9 +2128,8 @@ func TestTxnRequestTxnTimestamp(t *testing.T) {
curReq, req.expRequestTS, ba.Txn.Timestamp)
}

txnClone := ba.Txn.Clone()
br := ba.CreateReply()
br.Txn = &txnClone
br.Txn = ba.Txn.Clone()
br.Txn.Timestamp.Forward(requests[curReq].responseTS)
return br, nil
})
Expand Down Expand Up @@ -2171,9 +2164,8 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) {
sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if _, ok := ba.GetArg(roachpb.Get); ok {
manual.Increment(100)
txnClone := ba.Txn.Clone()
br := ba.CreateReply()
br.Txn = &txnClone
br.Txn = ba.Txn.Clone()
br.Txn.Timestamp.Forward(clock.Now())
return br, nil
}
Expand Down Expand Up @@ -2310,8 +2302,7 @@ func TestAnchorKey(t *testing.T) {
t.Fatalf("expected anchor %q, got %q", key2, ba.Txn.Key)
}
br := ba.CreateReply()
txn := ba.Txn.Clone()
br.Txn = &txn
br.Txn = ba.Txn.Clone()
return br, nil
}

Expand Down Expand Up @@ -2353,8 +2344,8 @@ func TestLeafTxnClientRejectError(t *testing.T) {
txn := ba.Txn.Clone()
txn.Status = roachpb.ABORTED
return roachpb.NewErrorWithTxn(
roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_UNKNOWN),
&txn)
roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_UNKNOWN), txn,
)
}
return nil
},
Expand Down
5 changes: 2 additions & 3 deletions pkg/kv/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func (tc *txnCommitter) closeLocked() {}

func cloneWithStatus(txn *roachpb.Transaction, s roachpb.TransactionStatus) *roachpb.Transaction {
clone := txn.Clone()
txn = &clone
txn.Status = s
return txn
clone.Status = s
return clone
}
4 changes: 2 additions & 2 deletions pkg/kv/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
}

ba := roachpb.BatchRequest{}
ba.Txn = &txn
ba.Txn = txn

hb := &roachpb.HeartbeatTxnRequest{
RequestHeader: roachpb.RequestHeader{
Expand Down Expand Up @@ -464,7 +464,7 @@ func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {

// Construct a batch with an EndTransaction request.
ba := roachpb.BatchRequest{}
ba.Header = roachpb.Header{Txn: &txn}
ba.Header = roachpb.Header{Txn: txn}
ba.Add(&roachpb.EndTransactionRequest{
Commit: false,
// Resolved intents should maintain an abort span entry to prevent
Expand Down
3 changes: 1 addition & 2 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,7 @@ func (h *BatchResponse_Header) combine(o BatchResponse_Header) error {
h.Timestamp.Forward(o.Timestamp)
if txn := o.Txn; txn != nil {
if h.Txn == nil {
txnClone := txn.Clone()
h.Txn = &txnClone
h.Txn = txn.Clone()
} else {
h.Txn.Update(txn)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (ba *BatchRequest) UpdateTxn(o *Transaction) {
}
clonedTxn := ba.Txn.Clone()
clonedTxn.Update(o)
ba.Txn = &clonedTxn
ba.Txn = clonedTxn
}

// IsLeaseRequest returns whether the batch consists of a single RequestLease
Expand Down
46 changes: 23 additions & 23 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,19 +813,10 @@ func (t Transaction) LastActive() hlc.Timestamp {
return ts
}

// Clone creates a copy of the given transaction. The copy is "mostly" deep,
// but does share pieces of memory with the original such as Key, ID and the
// keys with the intent spans.
func (t Transaction) Clone() Transaction {
mt := t.ObservedTimestamps
if mt != nil {
t.ObservedTimestamps = make([]ObservedTimestamp, len(mt))
copy(t.ObservedTimestamps, mt)
}
// Note that we're not cloning the span keys under the assumption that the
// keys themselves are not mutable.
t.Intents = append([]Span(nil), t.Intents...)
return t
// Clone creates a copy of the given transaction. The copy is shallow because
// none of the references held by a transaction allow interior mutability.
func (t Transaction) Clone() *Transaction {
return &t
}

// AssertInitialized crashes if the transaction is not initialized.
Expand Down Expand Up @@ -979,7 +970,7 @@ func (t *Transaction) Update(o *Transaction) {
}
o.AssertInitialized(context.TODO())
if t.ID == (uuid.UUID{}) {
*t = o.Clone()
*t = *o
return
}
if len(t.Key) == 0 {
Expand Down Expand Up @@ -1174,7 +1165,7 @@ func PrepareTransactionForRetry(
log.Fatalf(ctx, "missing txn for retryable error: %s", pErr)
}

txn := pErr.GetTxn().Clone()
txn := *pErr.GetTxn()
aborted := false
switch tErr := pErr.GetDetail().(type) {
case *TransactionAbortedError:
Expand Down Expand Up @@ -1259,7 +1250,7 @@ func CanTransactionRetryAtRefreshedTimestamp(
newTxn.RefreshedTimestamp.Forward(newTxn.Timestamp)
newTxn.WriteTooOld = false

return true, &newTxn
return true, newTxn
}

func readWithinUncertaintyIntervalRetryTimestamp(
Expand Down Expand Up @@ -1731,7 +1722,8 @@ func (kv KeyValueByKey) Swap(i, j int) {

var _ sort.Interface = KeyValueByKey{}

// observedTimestampSlice maintains a sorted list of observed timestamps.
// observedTimestampSlice maintains an immutable sorted list of observed
// timestamps.
type observedTimestampSlice []ObservedTimestamp

func (s observedTimestampSlice) index(nodeID NodeID) int {
Expand All @@ -1753,19 +1745,27 @@ func (s observedTimestampSlice) get(nodeID NodeID) (hlc.Timestamp, bool) {
}

// update the timestamp for the specified node, or add a new entry in the
// correct (sorted) location.
// correct (sorted) location. The receiver is not mutated.
func (s observedTimestampSlice) update(
nodeID NodeID, timestamp hlc.Timestamp,
) observedTimestampSlice {
i := s.index(nodeID)
if i < len(s) && s[i].NodeID == nodeID {
if timestamp.Less(s[i].Timestamp) {
s[i].Timestamp = timestamp
// The input slice is immutable, so copy and update.
cpy := make(observedTimestampSlice, len(s))
copy(cpy, s)
cpy[i].Timestamp = timestamp
return cpy
}
return s
}
s = append(s, ObservedTimestamp{})
copy(s[i+1:], s[i:])
s[i] = ObservedTimestamp{NodeID: nodeID, Timestamp: timestamp}
return s
// The input slice is immutable, so copy and update. Don't append to
// avoid an allocation. Doing so could invalidate a previous update
// to this receiver.
cpy := make(observedTimestampSlice, len(s)+1)
copy(cpy[:i], s[:i])
cpy[i] = ObservedTimestamp{NodeID: nodeID, Timestamp: timestamp}
copy(cpy[i+1:], s[i:])
return cpy
}
Loading