Skip to content

Commit

Permalink
intentresolver: cleanup CleanupTxnIntentsOnGCAsync()
Browse files Browse the repository at this point in the history
Before this patch, IntentResolver.CleanupTxnIntentsOnGCAsync() was taking
both a transaction and an []LockUpdate. This was confusing, because each
LockUpdate also has a TxnMeta in it. The implementation was also
surprising to me, since the method pushes the txn and then updates the
intents with the results of the push one by one.
The method was always called with a txn and all its intents; I don't
think it would make sense any other way. So this patch simplifies the
interface: the method now only takes a txn and then reads the intents
from it.

Release note: None
  • Loading branch information
andreimatei committed Aug 20, 2020
1 parent ff7e64c commit 75dbf3d
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 46 deletions.
2 changes: 1 addition & 1 deletion pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func runDebugGCCmd(cmd *cobra.Command, args []string) error {
now, thresh, policy,
gc.NoopGCer{},
func(_ context.Context, _ []roachpb.Intent) error { return nil },
func(_ context.Context, _ *roachpb.Transaction, _ []roachpb.LockUpdate) error { return nil },
func(_ context.Context, _ *roachpb.Transaction) error { return nil },
)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ type CleanupIntentsFunc func(context.Context, []roachpb.Intent) error
// transaction record, pushing the transaction first if it is
// PENDING. Once all intents are resolved successfully, removes the
// transaction record.
type CleanupTxnIntentsAsyncFunc func(context.Context, *roachpb.Transaction, []roachpb.LockUpdate) error
type CleanupTxnIntentsAsyncFunc func(context.Context, *roachpb.Transaction) error

// Run runs garbage collection for the specified descriptor on the
// provided Engine (which is not mutated). It uses the provided gcFn
Expand Down Expand Up @@ -423,7 +423,7 @@ func processLocalKeyRange(
// If the transaction needs to be pushed or there are intents to
// resolve, invoke the cleanup function.
if !txn.Status.IsFinalized() || len(txn.LockSpans) > 0 {
return cleanupTxnIntentsAsyncFn(ctx, txn, txn.LocksAsLockUpdates())
return cleanupTxnIntentsAsyncFn(ctx, txn)
}
b.FlushingAdd(ctx, key)
return nil
Expand Down
8 changes: 3 additions & 5 deletions pkg/kv/kvserver/gc/gc_random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func BenchmarkRun(b *testing.B) {
func(ctx context.Context, intents []roachpb.Intent) error {
return nil
},
func(ctx context.Context, txn *roachpb.Transaction, intents []roachpb.LockUpdate) error {
func(ctx context.Context, txn *roachpb.Transaction) error {
return nil
})
}
Expand Down Expand Up @@ -207,10 +207,8 @@ func (f *fakeGCer) GC(ctx context.Context, keys []roachpb.GCRequest_GCKey) error
return nil
}

func (f *fakeGCer) resolveIntentsAsync(
_ context.Context, txn *roachpb.Transaction, intents []roachpb.LockUpdate,
) error {
f.txnIntents = append(f.txnIntents, txnIntents{txn: txn, intents: intents})
func (f *fakeGCer) resolveIntentsAsync(_ context.Context, txn *roachpb.Transaction) error {
f.txnIntents = append(f.txnIntents, txnIntents{txn: txn, intents: txn.LocksAsLockUpdates()})
return nil
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,15 +467,15 @@ func (gcq *gcQueue) process(
}
return err
},
func(ctx context.Context, txn *roachpb.Transaction, intents []roachpb.LockUpdate) error {
func(ctx context.Context, txn *roachpb.Transaction) error {
err := repl.store.intentResolver.
CleanupTxnIntentsOnGCAsync(ctx, repl.RangeID, txn, intents, gcTimestamp,
CleanupTxnIntentsOnGCAsync(ctx, repl.RangeID, txn, gcTimestamp,
func(pushed, succeeded bool) {
if pushed {
gcq.store.metrics.GCPushTxn.Inc(1)
}
if succeeded {
gcq.store.metrics.GCResolveSuccess.Inc(int64(len(intents)))
gcq.store.metrics.GCResolveSuccess.Inc(int64(len(txn.LockSpans)))
}
})
if errors.Is(err, stop.ErrThrottled) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/gc_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func TestGCQueueProcess(t *testing.T) {
func(ctx context.Context, intents []roachpb.Intent) error {
return nil
},
func(ctx context.Context, txn *roachpb.Transaction, intents []roachpb.LockUpdate) error {
func(ctx context.Context, txn *roachpb.Transaction) error {
return nil
})
}()
Expand Down
28 changes: 12 additions & 16 deletions pkg/kv/kvserver/intentresolver/intent_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,9 +554,8 @@ func (ir *IntentResolver) CleanupTxnIntentsAsync(
return
}
defer release()
intents := et.Txn.LocksAsLockUpdates()
if err := ir.cleanupFinishedTxnIntents(
ctx, rangeID, et.Txn, intents, et.Poison, nil, /* onComplete */
ctx, rangeID, et.Txn, et.Poison, nil, /* onComplete */
); err != nil {
if ir.every.ShouldLog() {
log.Warningf(ctx, "failed to cleanup transaction intents: %v", err)
Expand Down Expand Up @@ -601,7 +600,6 @@ func (ir *IntentResolver) CleanupTxnIntentsOnGCAsync(
ctx context.Context,
rangeID roachpb.RangeID,
txn *roachpb.Transaction,
intents []roachpb.LockUpdate,
now hlc.Timestamp,
onComplete func(pushed, succeeded bool),
) error {
Expand Down Expand Up @@ -653,11 +651,11 @@ func (ir *IntentResolver) CleanupTxnIntentsOnGCAsync(
log.VErrEventf(ctx, 2, "failed to push %s, expired txn (%s): %s", txn.Status, txn, err)
return
}
// Get the pushed txn and update the intents slice.
txn = &b.RawResponse().Responses[0].GetInner().(*roachpb.PushTxnResponse).PusheeTxn
for i := range intents {
intents[i].SetTxn(txn)
}
// Update the txn with the result of the push, such that the intents we're about
// to resolve get a final status.
finalizedTxn := &b.RawResponse().Responses[0].GetInner().(*roachpb.PushTxnResponse).PusheeTxn
txn = txn.Clone()
txn.Update(finalizedTxn)
}
var onCleanupComplete func(error)
if onComplete != nil {
Expand All @@ -669,7 +667,7 @@ func (ir *IntentResolver) CleanupTxnIntentsOnGCAsync(
// Set onComplete to nil to disable the deferred call as the call has now
// been delegated to the callback passed to cleanupFinishedTxnIntents.
onComplete = nil
err := ir.cleanupFinishedTxnIntents(ctx, rangeID, txn, intents, false /* poison */, onCleanupComplete)
err := ir.cleanupFinishedTxnIntents(ctx, rangeID, txn, false /* poison */, onCleanupComplete)
if err != nil {
if ir.every.ShouldLog() {
log.Warningf(ctx, "failed to cleanup transaction intents: %+v", err)
Expand Down Expand Up @@ -727,16 +725,14 @@ func (ir *IntentResolver) gcTxnRecord(
return nil
}

// cleanupFinishedTxnIntents cleans up extant intents owned by a single
// transaction and when all intents have been successfully resolved, the
// transaction record is GC'ed asynchronously. onComplete will be called when
// all processing has completed which is likely to be after this call returns
// in the case of success.
// cleanupFinishedTxnIntents cleans up a txn's extant intents and, when all
// intents have been successfully resolved, the transaction record is GC'ed
// asynchronously. onComplete will be called when all processing has completed
// which is likely to be after this call returns in the case of success.
func (ir *IntentResolver) cleanupFinishedTxnIntents(
ctx context.Context,
rangeID roachpb.RangeID,
txn *roachpb.Transaction,
intents []roachpb.LockUpdate,
poison bool,
onComplete func(error),
) (err error) {
Expand All @@ -749,7 +745,7 @@ func (ir *IntentResolver) cleanupFinishedTxnIntents(
}()
// Resolve intents.
opts := ResolveOptions{Poison: poison, MinTimestamp: txn.MinTimestamp}
if pErr := ir.ResolveIntents(ctx, intents, opts); pErr != nil {
if pErr := ir.ResolveIntents(ctx, txn.LocksAsLockUpdates(), opts); pErr != nil {
return errors.Wrapf(pErr.GoError(), "failed to resolve intents")
}
// Run transaction record GC outside of ir.sem.
Expand Down
40 changes: 22 additions & 18 deletions pkg/kv/kvserver/intentresolver/intent_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
Clock: clock,
}
type testCase struct {
txn *roachpb.Transaction
intents []roachpb.LockUpdate
txn *roachpb.Transaction
// intentSpans, if set, are appended to txn.LockSpans. They'll result in
// ResolveIntent requests.
intentSpans []roachpb.Span
sendFuncs *sendFuncs
expectPushed bool
expectSucceed bool
Expand Down Expand Up @@ -106,9 +108,9 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
// has been pushed but that the garbage collection was not successful.
{
txn: txn1,
intents: []roachpb.LockUpdate{
roachpb.MakeLockUpdate(txn1, roachpb.Span{Key: key}),
roachpb.MakeLockUpdate(txn1, roachpb.Span{Key: key, EndKey: roachpb.Key("b")}),
intentSpans: []roachpb.Span{
{Key: key},
{Key: key, EndKey: roachpb.Key("b")},
},
sendFuncs: newSendFuncs(t,
singlePushTxnSendFunc(t),
Expand All @@ -126,10 +128,10 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
// that the txn has both been pushed and successfully resolved.
{
txn: txn1,
intents: []roachpb.LockUpdate{
roachpb.MakeLockUpdate(txn1, roachpb.Span{Key: key}),
roachpb.MakeLockUpdate(txn1, roachpb.Span{Key: roachpb.Key("aa")}),
roachpb.MakeLockUpdate(txn1, roachpb.Span{Key: key, EndKey: roachpb.Key("b")}),
intentSpans: []roachpb.Span{
{Key: key},
{Key: roachpb.Key("aa")},
{Key: key, EndKey: roachpb.Key("b")},
},
sendFuncs: func() *sendFuncs {
s := newSendFuncs(t)
Expand Down Expand Up @@ -165,9 +167,9 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
// has been pushed but that the garbage collection was not successful.
{
txn: txn3,
intents: []roachpb.LockUpdate{
roachpb.MakeLockUpdate(txn3, roachpb.Span{Key: key}),
roachpb.MakeLockUpdate(txn3, roachpb.Span{Key: key, EndKey: roachpb.Key("b")}),
intentSpans: []roachpb.Span{
{Key: key},
{Key: key, EndKey: roachpb.Key("b")},
},
sendFuncs: newSendFuncs(t,
singlePushTxnSendFunc(t),
Expand All @@ -185,10 +187,10 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
// that the txn has both been pushed and successfully resolved.
{
txn: txn3,
intents: []roachpb.LockUpdate{
roachpb.MakeLockUpdate(txn3, roachpb.Span{Key: key}),
roachpb.MakeLockUpdate(txn3, roachpb.Span{Key: roachpb.Key("aa")}),
roachpb.MakeLockUpdate(txn3, roachpb.Span{Key: key, EndKey: roachpb.Key("b")}),
intentSpans: []roachpb.Span{
{Key: key},
{Key: roachpb.Key("aa")},
{Key: key, EndKey: roachpb.Key("b")},
},
sendFuncs: func() *sendFuncs {
s := newSendFuncs(t)
Expand All @@ -207,7 +209,7 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
// is no push but that the gc has occurred successfully.
{
txn: txn4,
intents: []roachpb.LockUpdate{},
intentSpans: []roachpb.Span{},
sendFuncs: newSendFuncs(t, gcSendFunc(t)),
expectSucceed: true,
},
Expand All @@ -222,7 +224,9 @@ func TestCleanupTxnIntentsOnGCAsync(t *testing.T) {
didPush, didSucceed = pushed, succeeded
close(done)
}
err := ir.CleanupTxnIntentsOnGCAsync(ctx, 1, c.txn, c.intents, clock.Now(), onComplete)
txn := c.txn.Clone()
txn.LockSpans = append([]roachpb.Span{}, c.intentSpans...)
err := ir.CleanupTxnIntentsOnGCAsync(ctx, 1, txn, clock.Now(), onComplete)
if err != nil {
t.Fatalf("unexpected error sending async transaction")
}
Expand Down

0 comments on commit 75dbf3d

Please sign in to comment.