Skip to content

Commit

Permalink
gc: separate out lock table scan in the gc queue
Browse files Browse the repository at this point in the history
Previously, we'd handle resolving intents while scanning through the
MVCC keyspace if we came across one. We would resolve it if it was
older than some configurable threshold. In all this, we never needed
to scan the lock table keyspace directly.

We're introducing replicated shared and exclusive locks. The GC queue
is expected to resolve extant replicated locks, and as such, needs to
concern itself with an explicit lock table scan. This patch removes
intent handling from the scan of the MVCC keyspace. For now, no behavior
changes -- we only look for and resolve intents. In a future patch we'll
extend this behavior to include shared and exclusive replicated locks.

While here, we also modify an existing test
(`TestIntentAgeThresholdSetting`) to include an intent on a range local
key. I've ensured that the test fails if we were to only scan the lock
table corresponding to the global keyspace of a range.

Informs #111215

Release note: None
  • Loading branch information
arulajmani committed Sep 28, 2023
1 parent eb42ad1 commit 6b9599d
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 57 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/gc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/abortspan",
"//pkg/kv/kvserver/benignerror",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/rditer",
"//pkg/roachpb",
"//pkg/settings",
Expand All @@ -24,7 +25,6 @@ go_library(
"//pkg/util/bufalloc",
"//pkg/util/hlc",
"//pkg/util/log",
"//pkg/util/protoutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
Expand Down
152 changes: 102 additions & 50 deletions pkg/kv/kvserver/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/abortspan"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/benignerror"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand All @@ -38,7 +39,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
Expand Down Expand Up @@ -350,9 +350,9 @@ func Run(
Threshold: newThreshold,
}

fastPath, err := processReplicatedKeyRange(ctx, desc, snap, now, newThreshold, options.IntentAgeThreshold,
populateBatcherOptions(options),
gcer,
// Process all replicated locks first and resolve any that have been around
// for longer than the IntentAgeThreshold.
err := processReplicatedLocks(ctx, desc, snap, now, options.IntentAgeThreshold,
intentBatcherOptions{
maxIntentsPerIntentCleanupBatch: options.MaxIntentsPerIntentCleanupBatch,
maxIntentKeyBytesPerIntentCleanupBatch: options.MaxIntentKeyBytesPerIntentCleanupBatch,
Expand All @@ -365,6 +365,14 @@ func Run(
}
return Info{}, err
}
fastPath, err := processReplicatedKeyRange(ctx, desc, snap, newThreshold,
populateBatcherOptions(options), gcer, &info)
if err != nil {
if errors.Is(err, pebble.ErrSnapshotExcised) {
err = benignerror.NewStoreBenign(err)
}
return Info{}, err
}
err = processReplicatedRangeTombstones(ctx, desc, snap, fastPath, now, newThreshold, gcer, &info)
if err != nil {
if errors.Is(err, pebble.ErrSnapshotExcised) {
Expand Down Expand Up @@ -420,19 +428,15 @@ func populateBatcherOptions(options RunOptions) gcKeyBatcherThresholds {
// remove it.
//
// The logic iterates all versions of all keys in the range from oldest to
// newest. Expired intents are written into the txnMap and intentKeyMap.
// Returns true if clear range was used to remove all user data.
// newest. Intents are not handled by this function; they're simply skipped
// over. Returns true if clear range was used to remove user data.
func processReplicatedKeyRange(
ctx context.Context,
desc *roachpb.RangeDescriptor,
snap storage.Reader,
now hlc.Timestamp,
threshold hlc.Timestamp,
intentAgeThreshold time.Duration,
batcherThresholds gcKeyBatcherThresholds,
gcer PureGCer,
options intentBatcherOptions,
cleanupIntentsFn CleanupIntentsFunc,
info *Info,
) (bool, error) {
// Perform fast path check prior to performing GC. Fast path only collects
Expand All @@ -458,40 +462,11 @@ func processReplicatedKeyRange(
}
}

// Compute intent expiration (intent age at which we attempt to resolve).
intentExp := now.Add(-intentAgeThreshold.Nanoseconds(), 0)

return excludeUserKeySpan, rditer.IterateMVCCReplicaKeySpans(desc, snap, rditer.IterateOptions{
CombineRangesAndPoints: true,
Reverse: true,
ExcludeUserKeySpan: excludeUserKeySpan,
}, func(iterator storage.MVCCIterator, span roachpb.Span, keyType storage.IterKeyType) error {
intentBatcher := newIntentBatcher(cleanupIntentsFn, options, info)

// handleIntent will deserialize transaction info and if intent is older than
// threshold enqueue it to batcher, otherwise ignore it.
handleIntent := func(keyValue *mvccKeyValue) error {
meta := &enginepb.MVCCMetadata{}
if err := protoutil.Unmarshal(keyValue.metaValue, meta); err != nil {
log.Errorf(ctx, "unable to unmarshal MVCC metadata for key %q: %+v", keyValue.key, err)
return nil
}
if meta.Txn != nil {
// Keep track of intent to resolve if older than the intent
// expiration threshold.
if meta.Timestamp.ToTimestamp().Less(intentExp) {
info.IntentsConsidered++
if err := intentBatcher.addAndMaybeFlushIntents(ctx, keyValue.key.Key, meta); err != nil {
if errors.Is(err, ctx.Err()) {
return err
}
log.Warningf(ctx, "failed to cleanup intents batch: %v", err)
}
}
}
return nil
}

// Iterate all versions of all keys from oldest to newest. If a version is an
// intent it will have the highest timestamp of any versions and will be
// followed by a metadata entry.
Expand Down Expand Up @@ -527,18 +502,17 @@ func processReplicatedKeyRange(

switch {
case s.curIsNotValue():
// Skip over non mvcc data and intents.
// Skip over non mvcc data.
err = b.foundNonGCableData(ctx, s.cur, true /* isNewestPoint */)
case s.curIsIntent():
// Skip over intents; they cannot be GC-ed. We simply ignore them --
// processReplicatedLocks will resolve them, if necessary.
err = b.foundNonGCableData(ctx, s.cur, true /* isNewestPoint */)
if err != nil {
return err
}
if err = handleIntent(s.next); err != nil {
return err
}
// For intents, we force step over the intent metadata after provisional
// value is found.
// Force step over the intent metadata as well to move on to the next
// key.
it.step()
default:
if isGarbage(threshold, s.cur, s.next, s.curIsNewest(), s.firstRangeTombstoneTsAtOrBelowGC) {
Expand All @@ -552,20 +526,98 @@ func processReplicatedKeyRange(
}
}

err := b.flushLastBatch(ctx)
return b.flushLastBatch(ctx)
})
}

// processReplicatedLocks identifies extant replicated locks which have been
// around longer than the supplied intentAgeThreshold and resolves them.
func processReplicatedLocks(
ctx context.Context,
desc *roachpb.RangeDescriptor,
reader storage.Reader,
now hlc.Timestamp,
// TODO(arul): rename to lockAgeThreshold
intentAgeThreshold time.Duration,
options intentBatcherOptions,
cleanupIntentsFn CleanupIntentsFunc,
info *Info,
) error {
// Compute intent expiration (intent age at which we attempt to resolve).
intentExp := now.Add(-intentAgeThreshold.Nanoseconds(), 0)
intentBatcher := newIntentBatcher(cleanupIntentsFn, options, info)

process := func(ltStartKey, ltEndKey roachpb.Key) error {
opts := storage.LockTableIteratorOptions{
LowerBound: ltStartKey,
UpperBound: ltEndKey,
MatchMinStr: lock.Intent,
}
iter, err := storage.NewLockTableIterator(reader, opts)
if err != nil {
return err
}
defer iter.Close()

// We need to send out last intent cleanup batch.
if err := intentBatcher.maybeFlushPendingIntents(ctx); err != nil {
if errors.Is(err, ctx.Err()) {
var ok bool
for ok, err = iter.SeekEngineKeyGE(storage.EngineKey{Key: ltStartKey}); ok; ok, err = iter.NextEngineKey() {
if err != nil {
return err
}
var meta enginepb.MVCCMetadata
err = iter.ValueProto(&meta)
if err != nil {
return err
}
log.Warningf(ctx, "failed to cleanup intents batch: %v", err)
if meta.Txn == nil {
return errors.AssertionFailedf("intent without transaction")
}
// Keep track of intent to resolve if older than the intent
// expiration threshold.
if meta.Timestamp.ToTimestamp().Less(intentExp) {
info.IntentsConsidered++
key, err := iter.EngineKey()
if err != nil {
return err
}
ltKey, err := key.ToLockTableKey()
if err != nil {
return err
}
if ltKey.Strength != lock.Intent {
return errors.AssertionFailedf("unexpected strength for LockTableKey %s", ltKey.Strength)
}
if err := intentBatcher.addAndMaybeFlushIntents(ctx, ltKey.Key, &meta); err != nil {
if errors.Is(err, ctx.Err()) {
return err
}
log.Warningf(ctx, "failed to cleanup intents batch: %v", err)
}
}
}
return nil
}

// We want to find/resolve replicated locks over both local and global
// keys. That's what the call to Select below will give us.
ltSpans := rditer.Select(desc.RangeID, rditer.SelectOpts{
ReplicatedBySpan: desc.RSpan(),
ReplicatedSpansFilter: rditer.ReplicatedSpansLocksOnly,
})
for _, sp := range ltSpans {
if err := process(sp.Key, sp.EndKey); err != nil {
return err
}
}

// We need to send out last intent cleanup batch, if present.
if err := intentBatcher.maybeFlushPendingIntents(ctx); err != nil {
if errors.Is(err, ctx.Err()) {
return err
}
log.Warningf(ctx, "failed to cleanup intents batch: %v", err)
}
return nil
}

// gcBatchCounters contain statistics about garbage that is collected for the
Expand Down
18 changes: 12 additions & 6 deletions pkg/kv/kvserver/gc/gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,11 @@ func TestBatchingInlineGCer(t *testing.T) {
require.Zero(t, m.size)
}

// TestIntentAgeThresholdSetting verifies that the GC intent resolution threshold can be
// adjusted. It uses short and long threshold to verify that intents inserted between two
// thresholds are not considered for resolution when threshold is high (1st attempt) and
// considered when threshold is low (2nd attempt).
// TestIntentAgeThresholdSetting verifies that the GC intent resolution
// threshold can be adjusted. It uses short and long threshold to verify that
// intents inserted between two thresholds are not considered for resolution
// when threshold is high (1st attempt) and considered when threshold is low
// (2nd attempt).
func TestIntentAgeThresholdSetting(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand All @@ -131,12 +132,15 @@ func TestIntentAgeThresholdSetting(t *testing.T) {

// Prepare test intents in MVCC.
key := []byte("a")
localKey := keys.MakeRangeKeyPrefix(key)
value := roachpb.Value{RawBytes: []byte("0123456789")}
intentHlc := hlc.Timestamp{
WallTime: intentTs.Nanoseconds(),
}
txn := roachpb.MakeTransaction("txn", key, isolation.Serializable, roachpb.NormalUserPriority, intentHlc, 1000, 0, 0)
// Write two intents -- one for a global key, and another for a local key.
require.NoError(t, storage.MVCCPut(ctx, eng, key, intentHlc, value, storage.MVCCWriteOptions{Txn: &txn}))
require.NoError(t, storage.MVCCPut(ctx, eng, localKey, intentHlc, value, storage.MVCCWriteOptions{Txn: &txn}))
require.NoError(t, eng.Flush())

// Prepare test fixtures for GC run.
Expand All @@ -162,6 +166,7 @@ func TestIntentAgeThresholdSetting(t *testing.T) {
require.NoError(t, err, "GC Run shouldn't fail")
assert.Zero(t, info.IntentsConsidered,
"Expected no intents considered by GC with default threshold")
require.Zero(t, len(gcer.intents))

info, err = Run(ctx, &desc, snap, nowTs, nowTs,
RunOptions{
Expand All @@ -170,8 +175,9 @@ func TestIntentAgeThresholdSetting(t *testing.T) {
}, gcTTL, &gcer, gcer.resolveIntents,
gcer.resolveIntentsAsync)
require.NoError(t, err, "GC Run shouldn't fail")
assert.Equal(t, 1, info.IntentsConsidered,
"Expected 1 intents considered by GC with short threshold")
assert.Equal(t, 2, info.IntentsConsidered,
"Expected 2 intents considered by GC with short threshold")
require.Equal(t, 2, len(gcer.intents))
}

func TestIntentCleanupBatching(t *testing.T) {
Expand Down

0 comments on commit 6b9599d

Please sign in to comment.