Skip to content

Commit

Permalink
kv: stop copying roachpb.Transaction by value when pushing
Browse files Browse the repository at this point in the history
These structs are 288 bytes large - a little too large to copy around
unnecessarily when we already have pointers to their original, immutable
instance on the heap.
  • Loading branch information
nvanbenschoten committed Jun 3, 2020
1 parent e20d2a2 commit a1568b4
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 63 deletions.
20 changes: 10 additions & 10 deletions pkg/kv/kvserver/concurrency/concurrency_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,22 +529,22 @@ func (c *cluster) makeConfig() concurrency.Config {
// PushTransaction implements the concurrency.IntentResolver interface.
func (c *cluster) PushTransaction(
ctx context.Context, pushee *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType,
) (roachpb.Transaction, *roachpb.Error) {
) (*roachpb.Transaction, *roachpb.Error) {
pusheeRecord, err := c.getTxnRecord(pushee.ID)
if err != nil {
return roachpb.Transaction{}, roachpb.NewError(err)
return nil, roachpb.NewError(err)
}
var pusherRecord *txnRecord
if h.Txn != nil {
pusherID := h.Txn.ID
pusherRecord, err = c.getTxnRecord(pusherID)
if err != nil {
return roachpb.Transaction{}, roachpb.NewError(err)
return nil, roachpb.NewError(err)
}

push, err := c.registerPush(ctx, pusherID, pushee.ID)
if err != nil {
return roachpb.Transaction{}, roachpb.NewError(err)
return nil, roachpb.NewError(err)
}
defer c.unregisterPush(push)
}
Expand All @@ -558,28 +558,28 @@ func (c *cluster) PushTransaction(
case roachpb.PUSH_ABORT:
pushed = pusheeTxn.Status.IsFinalized()
default:
return roachpb.Transaction{}, roachpb.NewErrorf("unexpected push type: %s", pushType)
return nil, roachpb.NewErrorf("unexpected push type: %s", pushType)
}
if pushed {
return pusheeTxn, nil
}
// Or the pusher aborted?
var pusherRecordSig chan struct{}
if pusherRecord != nil {
var pusherTxn roachpb.Transaction
var pusherTxn *roachpb.Transaction
pusherTxn, pusherRecordSig = pusherRecord.asTxn()
if pusherTxn.Status == roachpb.ABORTED {
log.Eventf(ctx, "detected pusher aborted")
err := roachpb.NewTransactionAbortedError(roachpb.ABORT_REASON_PUSHER_ABORTED)
return roachpb.Transaction{}, roachpb.NewError(err)
return nil, roachpb.NewError(err)
}
}
// Wait until either record is updated.
select {
case <-pusheeRecordSig:
case <-pusherRecordSig:
case <-ctx.Done():
return roachpb.Transaction{}, roachpb.NewError(ctx.Err())
return nil, roachpb.NewError(ctx.Err())
}
}
}
Expand Down Expand Up @@ -649,10 +649,10 @@ func (c *cluster) updateTxnRecord(
return nil
}

func (r *txnRecord) asTxn() (roachpb.Transaction, chan struct{}) {
func (r *txnRecord) asTxn() (*roachpb.Transaction, chan struct{}) {
r.mu.Lock()
defer r.mu.Unlock()
txn := *r.txn
txn := r.txn.Clone()
if r.updatedStatus > txn.Status {
txn.Status = r.updatedStatus
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvserver/concurrency/lock_table_waiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,9 @@ type IntentResolver interface {
// provided pushee transaction immediately, if possible. Otherwise, it will
// block until the pushee transaction is finalized or eventually can be
// pushed successfully.
// TODO(nvanbenschoten): return a *roachpb.Transaction here.
PushTransaction(
context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType,
) (roachpb.Transaction, *Error)
) (*roachpb.Transaction, *Error)

// ResolveIntent synchronously resolves the provided intent.
ResolveIntent(context.Context, roachpb.LockUpdate, intentresolver.ResolveOptions) *Error
Expand Down Expand Up @@ -437,7 +436,7 @@ func (w *lockTableWaiterImpl) pushLockTxn(
// avoids needing to push it again if we find another one of its locks and
// allows for batching of intent resolution.
if pusheeTxn.Status.IsFinalized() {
w.finalizedTxnCache.add(&pusheeTxn)
w.finalizedTxnCache.add(pusheeTxn)
}

// If the push succeeded then the lock holder transaction must have
Expand Down Expand Up @@ -467,7 +466,7 @@ func (w *lockTableWaiterImpl) pushLockTxn(
// with the responsibility to abort the intents (for example if we find the
// transaction aborted). To do better here, we need per-intent information
// on whether we need to poison.
resolve := roachpb.MakeLockUpdateWithDur(&pusheeTxn, roachpb.Span{Key: ws.key}, ws.dur)
resolve := roachpb.MakeLockUpdateWithDur(pusheeTxn, roachpb.Span{Key: ws.key}, ws.dur)
opts := intentresolver.ResolveOptions{Poison: true}
return w.ir.ResolveIntent(ctx, resolve, opts)
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/kv/kvserver/concurrency/lock_table_waiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ import (
)

type mockIntentResolver struct {
pushTxn func(context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType) (roachpb.Transaction, *Error)
pushTxn func(context.Context, *enginepb.TxnMeta, roachpb.Header, roachpb.PushTxnType) (*roachpb.Transaction, *Error)
resolveIntent func(context.Context, roachpb.LockUpdate) *Error
resolveIntents func(context.Context, []roachpb.LockUpdate) *Error
}

// mockIntentResolver implements the IntentResolver interface.
func (m *mockIntentResolver) PushTransaction(
ctx context.Context, txn *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType,
) (roachpb.Transaction, *Error) {
) (*roachpb.Transaction, *Error) {
return m.pushTxn(ctx, txn, h, pushType)
}

Expand Down Expand Up @@ -291,7 +291,7 @@ func testWaitPush(t *testing.T, k stateKind, makeReq func() Request, expPushTS h
pusheeArg *enginepb.TxnMeta,
h roachpb.Header,
pushType roachpb.PushTxnType,
) (roachpb.Transaction, *Error) {
) (*roachpb.Transaction, *Error) {
require.Equal(t, &pusheeTxn.TxnMeta, pusheeArg)
require.Equal(t, req.Txn, h.Txn)
require.Equal(t, expPushTS, h.Timestamp)
Expand All @@ -301,7 +301,7 @@ func testWaitPush(t *testing.T, k stateKind, makeReq func() Request, expPushTS h
require.Equal(t, roachpb.PUSH_TIMESTAMP, pushType)
}

resp := roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.ABORTED}
resp := &roachpb.Transaction{TxnMeta: *pusheeArg, Status: roachpb.ABORTED}

// If the lock is held, we'll try to resolve it now that
// we know the holder is ABORTED. Otherwide, immediately
Expand Down Expand Up @@ -396,8 +396,8 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) {
g.notify()
ir.pushTxn = func(
_ context.Context, _ *enginepb.TxnMeta, _ roachpb.Header, _ roachpb.PushTxnType,
) (roachpb.Transaction, *Error) {
return roachpb.Transaction{}, err1
) (*roachpb.Transaction, *Error) {
return nil, err1
}
err := w.WaitOn(ctx, req, g)
require.Equal(t, err1, err)
Expand All @@ -407,8 +407,8 @@ func TestLockTableWaiterIntentResolverError(t *testing.T) {
g.notify()
ir.pushTxn = func(
_ context.Context, _ *enginepb.TxnMeta, _ roachpb.Header, _ roachpb.PushTxnType,
) (roachpb.Transaction, *Error) {
return roachpb.Transaction{}, nil
) (*roachpb.Transaction, *Error) {
return &pusheeTxn, nil
}
ir.resolveIntent = func(_ context.Context, intent roachpb.LockUpdate) *Error {
return err2
Expand Down
14 changes: 7 additions & 7 deletions pkg/kv/kvserver/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func getPusherTxn(h roachpb.Header) roachpb.Transaction {
// the returned intent slice.
func updateIntentTxnStatus(
ctx context.Context,
pushedTxns map[uuid.UUID]roachpb.Transaction,
pushedTxns map[uuid.UUID]*roachpb.Transaction,
intents []roachpb.Intent,
skipIfInFlight bool,
results []roachpb.LockUpdate,
Expand All @@ -263,7 +263,7 @@ func updateIntentTxnStatus(
// It must have been skipped.
continue
}
up := roachpb.MakeLockUpdateWithDur(&pushee, roachpb.Span{Key: intent.Key}, lock.Replicated)
up := roachpb.MakeLockUpdateWithDur(pushee, roachpb.Span{Key: intent.Key}, lock.Replicated)
results = append(results, up)
}
return results
Expand All @@ -274,12 +274,12 @@ func updateIntentTxnStatus(
// to the pushed transaction.
func (ir *IntentResolver) PushTransaction(
ctx context.Context, pushTxn *enginepb.TxnMeta, h roachpb.Header, pushType roachpb.PushTxnType,
) (roachpb.Transaction, *roachpb.Error) {
) (*roachpb.Transaction, *roachpb.Error) {
pushTxns := make(map[uuid.UUID]*enginepb.TxnMeta, 1)
pushTxns[pushTxn.ID] = pushTxn
pushedTxns, pErr := ir.MaybePushTransactions(ctx, pushTxns, h, pushType, false /* skipIfInFlight */)
if pErr != nil {
return roachpb.Transaction{}, pErr
return nil, pErr
}
pushedTxn, ok := pushedTxns[pushTxn.ID]
if !ok {
Expand Down Expand Up @@ -316,7 +316,7 @@ func (ir *IntentResolver) MaybePushTransactions(
h roachpb.Header,
pushType roachpb.PushTxnType,
skipIfInFlight bool,
) (map[uuid.UUID]roachpb.Transaction, *roachpb.Error) {
) (map[uuid.UUID]*roachpb.Transaction, *roachpb.Error) {
// Decide which transactions to push and which to ignore because
// of other in-flight requests. For those transactions that we
// will be pushing, increment their ref count in the in-flight
Expand Down Expand Up @@ -374,9 +374,9 @@ func (ir *IntentResolver) MaybePushTransactions(
}

br := b.RawResponse()
pushedTxns := map[uuid.UUID]roachpb.Transaction{}
pushedTxns := make(map[uuid.UUID]*roachpb.Transaction, len(br.Responses))
for _, resp := range br.Responses {
txn := resp.GetInner().(*roachpb.PushTxnResponse).PusheeTxn
txn := &resp.GetInner().(*roachpb.PushTxnResponse).PusheeTxn
if _, ok := pushedTxns[txn.ID]; ok {
log.Fatalf(ctx, "have two PushTxn responses for %s", txn.ID)
}
Expand Down
26 changes: 13 additions & 13 deletions pkg/kv/kvserver/rangefeed/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,24 +632,24 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
txn1Meta := enginepb.TxnMeta{ID: txn1, Key: keyA, WriteTimestamp: ts10, MinTimestamp: ts10}
txn2Meta := enginepb.TxnMeta{ID: txn2, Key: keyB, WriteTimestamp: ts20, MinTimestamp: ts20}
txn3Meta := enginepb.TxnMeta{ID: txn3, Key: keyC, WriteTimestamp: ts30, MinTimestamp: ts30}
txn1Proto := roachpb.Transaction{TxnMeta: txn1Meta, Status: roachpb.PENDING}
txn2Proto := roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.PENDING}
txn3Proto := roachpb.Transaction{TxnMeta: txn3Meta, Status: roachpb.PENDING}
txn1Proto := &roachpb.Transaction{TxnMeta: txn1Meta, Status: roachpb.PENDING}
txn2Proto := &roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.PENDING}
txn3Proto := &roachpb.Transaction{TxnMeta: txn3Meta, Status: roachpb.PENDING}

// Modifications for test 2.
txn1MetaT2Pre := enginepb.TxnMeta{ID: txn1, Key: keyA, WriteTimestamp: ts25, MinTimestamp: ts10}
txn1MetaT2Post := enginepb.TxnMeta{ID: txn1, Key: keyA, WriteTimestamp: ts50, MinTimestamp: ts10}
txn2MetaT2Post := enginepb.TxnMeta{ID: txn2, Key: keyB, WriteTimestamp: ts60, MinTimestamp: ts20}
txn3MetaT2Post := enginepb.TxnMeta{ID: txn3, Key: keyC, WriteTimestamp: ts70, MinTimestamp: ts30}
txn1ProtoT2 := roachpb.Transaction{TxnMeta: txn1MetaT2Post, Status: roachpb.COMMITTED}
txn2ProtoT2 := roachpb.Transaction{TxnMeta: txn2MetaT2Post, Status: roachpb.PENDING}
txn3ProtoT2 := roachpb.Transaction{TxnMeta: txn3MetaT2Post, Status: roachpb.PENDING}
txn1ProtoT2 := &roachpb.Transaction{TxnMeta: txn1MetaT2Post, Status: roachpb.COMMITTED}
txn2ProtoT2 := &roachpb.Transaction{TxnMeta: txn2MetaT2Post, Status: roachpb.PENDING}
txn3ProtoT2 := &roachpb.Transaction{TxnMeta: txn3MetaT2Post, Status: roachpb.PENDING}

// Modifications for test 3.
txn2MetaT3Post := enginepb.TxnMeta{ID: txn2, Key: keyB, WriteTimestamp: ts60, MinTimestamp: ts20}
txn3MetaT3Post := enginepb.TxnMeta{ID: txn3, Key: keyC, WriteTimestamp: ts90, MinTimestamp: ts30}
txn2ProtoT3 := roachpb.Transaction{TxnMeta: txn2MetaT3Post, Status: roachpb.ABORTED}
txn3ProtoT3 := roachpb.Transaction{TxnMeta: txn3MetaT3Post, Status: roachpb.PENDING}
txn2ProtoT3 := &roachpb.Transaction{TxnMeta: txn2MetaT3Post, Status: roachpb.ABORTED}
txn3ProtoT3 := &roachpb.Transaction{TxnMeta: txn3MetaT3Post, Status: roachpb.PENDING}

testNum := 0
pausePushAttemptsC := make(chan struct{})
Expand All @@ -659,7 +659,7 @@ func TestProcessorTxnPushAttempt(t *testing.T) {

// Create a TxnPusher that performs assertions during the first 3 uses.
var tp testTxnPusher
tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]roachpb.Transaction, error) {
tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) {
// The txns are not in a sorted order. Enforce one.
sort.Slice(txns, func(i, j int) bool {
return bytes.Compare(txns[i].Key, txns[j].Key) < 0
Expand All @@ -674,27 +674,27 @@ func TestProcessorTxnPushAttempt(t *testing.T) {
require.Equal(t, txn3Meta, txns[2])

// Push does not succeed. Protos not at larger ts.
return []roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, nil
return []*roachpb.Transaction{txn1Proto, txn2Proto, txn3Proto}, nil
case 2:
require.Equal(t, 3, len(txns))
require.Equal(t, txn1MetaT2Pre, txns[0])
require.Equal(t, txn2Meta, txns[1])
require.Equal(t, txn3Meta, txns[2])

// Push succeeds. Return new protos.
return []roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, nil
return []*roachpb.Transaction{txn1ProtoT2, txn2ProtoT2, txn3ProtoT2}, nil
case 3:
require.Equal(t, 2, len(txns))
require.Equal(t, txn2MetaT2Post, txns[0])
require.Equal(t, txn3MetaT2Post, txns[1])

// Push succeeds. Return new protos.
return []roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, nil
return []*roachpb.Transaction{txn2ProtoT3, txn3ProtoT3}, nil
default:
return nil, nil
}
})
tp.mockCleanupTxnIntentsAsync(func(txns []roachpb.Transaction) error {
tp.mockCleanupTxnIntentsAsync(func(txns []*roachpb.Transaction) error {
switch testNum {
case 1:
require.Equal(t, 0, len(txns))
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/rangefeed/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,10 @@ func (s *initResolvedTSScan) Cancel() {
type TxnPusher interface {
// PushTxns attempts to push the specified transactions to a new
// timestamp. It returns the resulting transaction protos.
PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]roachpb.Transaction, error)
PushTxns(context.Context, []enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
// CleanupTxnIntentsAsync asynchronously cleans up intents owned
// by the specified transactions.
CleanupTxnIntentsAsync(context.Context, []roachpb.Transaction) error
CleanupTxnIntentsAsync(context.Context, []*roachpb.Transaction) error
}

// txnPushAttempt pushes all old transactions that have unresolved intents on
Expand Down Expand Up @@ -176,7 +176,7 @@ func (a *txnPushAttempt) pushOldTxns(ctx context.Context) error {

// Inform the Processor of the results of the push for each transaction.
ops := make([]enginepb.MVCCLogicalOp, len(pushedTxns))
var toCleanup []roachpb.Transaction
var toCleanup []*roachpb.Transaction
for i, txn := range pushedTxns {
switch txn.Status {
case roachpb.PENDING, roachpb.STAGING:
Expand Down
26 changes: 13 additions & 13 deletions pkg/kv/kvserver/rangefeed/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,29 +249,29 @@ func TestInitResolvedTSScan(t *testing.T) {
}

type testTxnPusher struct {
pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]roachpb.Transaction, error)
cleanupTxnIntentsAsyncFn func([]roachpb.Transaction) error
pushTxnsFn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error)
cleanupTxnIntentsAsyncFn func([]*roachpb.Transaction) error
}

func (tp *testTxnPusher) PushTxns(
ctx context.Context, txns []enginepb.TxnMeta, ts hlc.Timestamp,
) ([]roachpb.Transaction, error) {
) ([]*roachpb.Transaction, error) {
return tp.pushTxnsFn(txns, ts)
}

func (tp *testTxnPusher) CleanupTxnIntentsAsync(
ctx context.Context, txns []roachpb.Transaction,
ctx context.Context, txns []*roachpb.Transaction,
) error {
return tp.cleanupTxnIntentsAsyncFn(txns)
}

func (tp *testTxnPusher) mockPushTxns(
fn func([]enginepb.TxnMeta, hlc.Timestamp) ([]roachpb.Transaction, error),
fn func([]enginepb.TxnMeta, hlc.Timestamp) ([]*roachpb.Transaction, error),
) {
tp.pushTxnsFn = fn
}

func (tp *testTxnPusher) mockCleanupTxnIntentsAsync(fn func([]roachpb.Transaction) error) {
func (tp *testTxnPusher) mockCleanupTxnIntentsAsync(fn func([]*roachpb.Transaction) error) {
tp.cleanupTxnIntentsAsyncFn = fn
}

Expand All @@ -284,25 +284,25 @@ func TestTxnPushAttempt(t *testing.T) {
txn1Meta := enginepb.TxnMeta{ID: txn1, Key: keyA, WriteTimestamp: ts1, MinTimestamp: ts1}
txn2Meta := enginepb.TxnMeta{ID: txn2, Key: keyB, WriteTimestamp: ts2, MinTimestamp: ts2}
txn3Meta := enginepb.TxnMeta{ID: txn3, Key: keyC, WriteTimestamp: ts3, MinTimestamp: ts3}
txn1Proto := roachpb.Transaction{TxnMeta: txn1Meta, Status: roachpb.PENDING}
txn2Proto := roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.COMMITTED}
txn3Proto := roachpb.Transaction{TxnMeta: txn3Meta, Status: roachpb.ABORTED}
txn1Proto := &roachpb.Transaction{TxnMeta: txn1Meta, Status: roachpb.PENDING}
txn2Proto := &roachpb.Transaction{TxnMeta: txn2Meta, Status: roachpb.COMMITTED}
txn3Proto := &roachpb.Transaction{TxnMeta: txn3Meta, Status: roachpb.ABORTED}

// Run a txnPushAttempt.
var tp testTxnPusher
tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]roachpb.Transaction, error) {
tp.mockPushTxns(func(txns []enginepb.TxnMeta, ts hlc.Timestamp) ([]*roachpb.Transaction, error) {
require.Equal(t, 3, len(txns))
require.Equal(t, txn1Meta, txns[0])
require.Equal(t, txn2Meta, txns[1])
require.Equal(t, txn3Meta, txns[2])
require.Equal(t, hlc.Timestamp{WallTime: 15}, ts)

// Return all three protos. The PENDING txn is pushed.
txn1ProtoPushed := txn1Proto
txn1ProtoPushed := txn1Proto.Clone()
txn1ProtoPushed.WriteTimestamp = ts
return []roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto}, nil
return []*roachpb.Transaction{txn1ProtoPushed, txn2Proto, txn3Proto}, nil
})
tp.mockCleanupTxnIntentsAsync(func(txns []roachpb.Transaction) error {
tp.mockCleanupTxnIntentsAsync(func(txns []*roachpb.Transaction) error {
require.Equal(t, 2, len(txns))
require.Equal(t, txn2Proto, txns[0])
require.Equal(t, txn3Proto, txns[1])
Expand Down
Loading

0 comments on commit a1568b4

Please sign in to comment.