Skip to content

Commit

Permalink
Merge #35762
Browse files Browse the repository at this point in the history
35762: roachpb: refine Transaction proto cloning r=nvanbenschoten a=nvanbenschoten

Fixes #35803.

This PR includes the final two commits from #35719.

> By making Transaction.ObservedTimestamps immutable (which it almost already was), we can prohibit all interior mutability of references within Transaction, give it value semantics, and eliminate the distinction between "shallow" and "deep" object cloning. This reduces the cost of a clone to a single straightforward allocation and makes working with the object easier to think about.

cc. @tbg 

Co-authored-by: Nathan VanBenschoten <[email protected]>
  • Loading branch information
craig[bot] and nvanbenschoten committed Mar 25, 2019
2 parents 4b368a1 + c5dc833 commit 7a1d640
Show file tree
Hide file tree
Showing 32 changed files with 151 additions and 166 deletions.
5 changes: 2 additions & 3 deletions pkg/internal/client/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func NewMockTransactionalSender(
) (*roachpb.BatchResponse, *roachpb.Error),
txn *roachpb.Transaction,
) *MockTransactionalSender {
return &MockTransactionalSender{senderFunc: f, txn: txn.Clone()}
return &MockTransactionalSender{senderFunc: f, txn: *txn}
}

// Send is part of the TxnSender interface.
Expand Down Expand Up @@ -356,8 +356,7 @@ func (m *MockTransactionalSender) Epoch() uint32 { panic("unimplemented") }

// SerializeTxn is part of the TxnSender interface.
func (m *MockTransactionalSender) SerializeTxn() *roachpb.Transaction {
cp := m.txn.Clone()
return &cp
return m.txn.Clone()
}

// UpdateStateOnRemoteRetryableErr is part of the TxnSender interface.
Expand Down
5 changes: 2 additions & 3 deletions pkg/internal/client/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,12 @@ func newTestTxnFactory(
}
}
if ba.Txn != nil && br.Txn == nil {
txnClone := ba.Txn.Clone()
br.Txn = &txnClone
br.Txn = ba.Txn.Clone()
if pErr == nil {
br.Txn.Status = status
}
// Update the MockTxnSender's proto.
*txn = br.Txn.Clone()
*txn = *br.Txn
}
return br, pErr
})
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,8 +489,7 @@ func TestImmutableBatchArgs(t *testing.T) {
args roachpb.BatchRequest,
) (*roachpb.BatchResponse, error) {
reply := args.CreateReply()
txnClone := args.Txn.Clone()
reply.Txn = &txnClone
reply.Txn = args.Txn.Clone()
reply.Txn.Timestamp = hlc.MaxTimestamp
return reply, nil
}
Expand Down
11 changes: 4 additions & 7 deletions pkg/kv/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ func (tc *TxnCoordSender) GetMeta(
defer tc.mu.Unlock()
// Copy mutable state so access is safe for the caller.
var meta roachpb.TxnCoordMeta
meta.Txn = tc.mu.txn.Clone()
meta.Txn = tc.mu.txn
for _, reqInt := range tc.interceptorStack {
reqInt.populateMetaLocked(&meta)
}
Expand Down Expand Up @@ -690,9 +690,8 @@ func generateTxnDeadlineExceededErr(
"txn timestamp pushed too much; deadline exceeded by %s (%s > %s), "+
"original timestamp %s ago (%s)",
exceededBy, txn.Timestamp, deadline, fromStart, txn.OrigTimestamp)
txnCpy := txn.Clone()
return roachpb.NewErrorWithTxn(
roachpb.NewTransactionRetryError(roachpb.RETRY_COMMIT_DEADLINE_EXCEEDED, extraMsg), &txnCpy)
roachpb.NewTransactionRetryError(roachpb.RETRY_COMMIT_DEADLINE_EXCEEDED, extraMsg), txn)
}

// commitReadOnlyTxnLocked "commits" a read-only txn. It is equivalent, but
Expand Down Expand Up @@ -790,8 +789,7 @@ func (tc *TxnCoordSender) Send(
}
// Clone the Txn's Proto so that future modifications can be made without
// worrying about synchronization.
newTxn := tc.mu.txn.Clone()
ba.Txn = &newTxn
ba.Txn = tc.mu.txn.Clone()

// Send the command through the txnInterceptor stack.
br, pErr := tc.interceptorStack[0].SendLocked(ctx, ba)
Expand Down Expand Up @@ -1239,6 +1237,5 @@ func (tc *TxnCoordSender) Epoch() uint32 {
func (tc *TxnCoordSender) SerializeTxn() *roachpb.Transaction {
tc.mu.Lock()
defer tc.mu.Unlock()
cpy := tc.mu.txn.Clone()
return &cpy
return tc.mu.txn.Clone()
}
31 changes: 11 additions & 20 deletions pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -914,8 +914,7 @@ func TestTxnCoordSenderNoDuplicateIntents(t *testing.T) {
}
}
br := ba.CreateReply()
txnClone := ba.Txn.Clone()
br.Txn = &txnClone
br.Txn = ba.Txn.Clone()
return br, nil
}
ambient := log.AmbientContext{Tracer: tracing.NewTracer()}
Expand Down Expand Up @@ -1261,8 +1260,7 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) {
_ context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {
br := ba.CreateReply()
txn := ba.Txn.Clone()
br.Txn = &txn
br.Txn = ba.Txn.Clone()

if _, hasPut := ba.GetArg(roachpb.Put); hasPut {
if _, ok := ba.Requests[0].GetInner().(*roachpb.PutRequest); !ok {
Expand All @@ -1271,8 +1269,7 @@ func TestAbortTransactionOnCommitErrors(t *testing.T) {
union := &br.Responses[0] // avoid operating on copy
union.MustSetInner(&roachpb.PutResponse{})
if ba.Txn != nil && br.Txn == nil {
txnClone := ba.Txn.Clone()
br.Txn = &txnClone
br.Txn = ba.Txn.Clone()
br.Txn.Status = roachpb.PENDING
}
} else if et, hasET := ba.GetArg(roachpb.EndTransaction); hasET {
Expand Down Expand Up @@ -1353,8 +1350,7 @@ func (s *mockSender) Send(
}
// If none of the matchers triggered, just create an empty reply.
br := ba.CreateReply()
txn := ba.Txn.Clone()
br.Txn = &txn
br.Txn = ba.Txn.Clone()
return br, nil
}

Expand Down Expand Up @@ -1473,8 +1469,7 @@ func TestOnePCErrorTracking(t *testing.T) {
resp := ba.CreateReply()
// Set the response's txn to the Aborted status (as the server would). This
// will make the TxnCoordSender stop the heartbeat loop.
txnCopy := ba.Txn.Clone()
resp.Txn = &txnCopy
resp.Txn = ba.Txn.Clone()
resp.Txn.Status = roachpb.ABORTED
return resp, nil
})
Expand Down Expand Up @@ -1937,8 +1932,7 @@ func TestTransactionKeyNotChangedInRestart(t *testing.T) {
// Ignore the final EndTxnRequest.
if _, ok := ba.GetArg(roachpb.EndTransaction); ok {
br := ba.CreateReply()
txn := ba.Txn.Clone()
br.Txn = &txn
br.Txn = ba.Txn.Clone()
return nil, nil
}

Expand Down Expand Up @@ -2134,9 +2128,8 @@ func TestTxnRequestTxnTimestamp(t *testing.T) {
curReq, req.expRequestTS, ba.Txn.Timestamp)
}

txnClone := ba.Txn.Clone()
br := ba.CreateReply()
br.Txn = &txnClone
br.Txn = ba.Txn.Clone()
br.Txn.Timestamp.Forward(requests[curReq].responseTS)
return br, nil
})
Expand Down Expand Up @@ -2171,9 +2164,8 @@ func TestReadOnlyTxnObeysDeadline(t *testing.T) {
sender.match(func(ba roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error) {
if _, ok := ba.GetArg(roachpb.Get); ok {
manual.Increment(100)
txnClone := ba.Txn.Clone()
br := ba.CreateReply()
br.Txn = &txnClone
br.Txn = ba.Txn.Clone()
br.Txn.Timestamp.Forward(clock.Now())
return br, nil
}
Expand Down Expand Up @@ -2310,8 +2302,7 @@ func TestAnchorKey(t *testing.T) {
t.Fatalf("expected anchor %q, got %q", key2, ba.Txn.Key)
}
br := ba.CreateReply()
txn := ba.Txn.Clone()
br.Txn = &txn
br.Txn = ba.Txn.Clone()
return br, nil
}

Expand Down Expand Up @@ -2353,8 +2344,8 @@ func TestLeafTxnClientRejectError(t *testing.T) {
txn := ba.Txn.Clone()
txn.Status = roachpb.ABORTED
return roachpb.NewErrorWithTxn(
roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_UNKNOWN),
&txn)
roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_UNKNOWN), txn,
)
}
return nil
},
Expand Down
5 changes: 2 additions & 3 deletions pkg/kv/txn_interceptor_committer.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func (tc *txnCommitter) closeLocked() {}

func cloneWithStatus(txn *roachpb.Transaction, s roachpb.TransactionStatus) *roachpb.Transaction {
clone := txn.Clone()
txn = &clone
txn.Status = s
return txn
clone.Status = s
return clone
}
4 changes: 2 additions & 2 deletions pkg/kv/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {
}

ba := roachpb.BatchRequest{}
ba.Txn = &txn
ba.Txn = txn

hb := &roachpb.HeartbeatTxnRequest{
RequestHeader: roachpb.RequestHeader{
Expand Down Expand Up @@ -464,7 +464,7 @@ func (h *txnHeartbeater) abortTxnAsyncLocked(ctx context.Context) {

// Construct a batch with an EndTransaction request.
ba := roachpb.BatchRequest{}
ba.Header = roachpb.Header{Txn: &txn}
ba.Header = roachpb.Header{Txn: txn}
ba.Add(&roachpb.EndTransactionRequest{
Commit: false,
// Resolved intents should maintain an abort span entry to prevent
Expand Down
3 changes: 1 addition & 2 deletions pkg/roachpb/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,8 +385,7 @@ func (h *BatchResponse_Header) combine(o BatchResponse_Header) error {
h.Timestamp.Forward(o.Timestamp)
if txn := o.Txn; txn != nil {
if h.Txn == nil {
txnClone := txn.Clone()
h.Txn = &txnClone
h.Txn = txn.Clone()
} else {
h.Txn.Update(txn)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/roachpb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (ba *BatchRequest) UpdateTxn(o *Transaction) {
}
clonedTxn := ba.Txn.Clone()
clonedTxn.Update(o)
ba.Txn = &clonedTxn
ba.Txn = clonedTxn
}

// IsLeaseRequest returns whether the batch consists of a single RequestLease
Expand Down
46 changes: 23 additions & 23 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,19 +813,10 @@ func (t Transaction) LastActive() hlc.Timestamp {
return ts
}

// Clone creates a copy of the given transaction. The copy is "mostly" deep,
// but does share pieces of memory with the original such as Key, ID and the
// keys with the intent spans.
func (t Transaction) Clone() Transaction {
mt := t.ObservedTimestamps
if mt != nil {
t.ObservedTimestamps = make([]ObservedTimestamp, len(mt))
copy(t.ObservedTimestamps, mt)
}
// Note that we're not cloning the span keys under the assumption that the
// keys themselves are not mutable.
t.Intents = append([]Span(nil), t.Intents...)
return t
// Clone creates a copy of the given transaction. The copy is shallow because
// none of the references held by a transaction allow interior mutability.
func (t Transaction) Clone() *Transaction {
return &t
}

// AssertInitialized crashes if the transaction is not initialized.
Expand Down Expand Up @@ -979,7 +970,7 @@ func (t *Transaction) Update(o *Transaction) {
}
o.AssertInitialized(context.TODO())
if t.ID == (uuid.UUID{}) {
*t = o.Clone()
*t = *o
return
}
if len(t.Key) == 0 {
Expand Down Expand Up @@ -1174,7 +1165,7 @@ func PrepareTransactionForRetry(
log.Fatalf(ctx, "missing txn for retryable error: %s", pErr)
}

txn := pErr.GetTxn().Clone()
txn := *pErr.GetTxn()
aborted := false
switch tErr := pErr.GetDetail().(type) {
case *TransactionAbortedError:
Expand Down Expand Up @@ -1259,7 +1250,7 @@ func CanTransactionRetryAtRefreshedTimestamp(
newTxn.RefreshedTimestamp.Forward(newTxn.Timestamp)
newTxn.WriteTooOld = false

return true, &newTxn
return true, newTxn
}

func readWithinUncertaintyIntervalRetryTimestamp(
Expand Down Expand Up @@ -1731,7 +1722,8 @@ func (kv KeyValueByKey) Swap(i, j int) {

var _ sort.Interface = KeyValueByKey{}

// observedTimestampSlice maintains a sorted list of observed timestamps.
// observedTimestampSlice maintains an immutable sorted list of observed
// timestamps.
type observedTimestampSlice []ObservedTimestamp

func (s observedTimestampSlice) index(nodeID NodeID) int {
Expand All @@ -1753,19 +1745,27 @@ func (s observedTimestampSlice) get(nodeID NodeID) (hlc.Timestamp, bool) {
}

// update the timestamp for the specified node, or add a new entry in the
// correct (sorted) location.
// correct (sorted) location. The receiver is not mutated.
func (s observedTimestampSlice) update(
nodeID NodeID, timestamp hlc.Timestamp,
) observedTimestampSlice {
i := s.index(nodeID)
if i < len(s) && s[i].NodeID == nodeID {
if timestamp.Less(s[i].Timestamp) {
s[i].Timestamp = timestamp
// The input slice is immutable, so copy and update.
cpy := make(observedTimestampSlice, len(s))
copy(cpy, s)
cpy[i].Timestamp = timestamp
return cpy
}
return s
}
s = append(s, ObservedTimestamp{})
copy(s[i+1:], s[i:])
s[i] = ObservedTimestamp{NodeID: nodeID, Timestamp: timestamp}
return s
// The input slice is immutable, so copy and update. Don't append to
// avoid an allocation. Doing so could invalidate a previous update
// to this receiver.
cpy := make(observedTimestampSlice, len(s)+1)
copy(cpy[:i], s[:i])
cpy[i] = ObservedTimestamp{NodeID: nodeID, Timestamp: timestamp}
copy(cpy[i+1:], s[i:])
return cpy
}
Loading

0 comments on commit 7a1d640

Please sign in to comment.