Skip to content

Commit

Permalink
Merge pull request #60745 from nvanbenschoten/backport20.2-60619
Browse files Browse the repository at this point in the history
release-20.2: kv: allow manual GC that does not advance threshold
  • Loading branch information
nvanbenschoten authored Feb 18, 2021
2 parents 6b701b5 + 569a45b commit a1b85f5
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 30 deletions.
9 changes: 7 additions & 2 deletions pkg/kv/kvserver/gc_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,15 @@ func (gcq *gcQueue) shouldQueue(
// Consult the protected timestamp state to determine whether we can GC and
// the timestamp which can be used to calculate the score.
_, zone := repl.DescAndZone()
canGC, _, gcTimestamp, _ := repl.checkProtectedTimestampsForGC(ctx, *zone.GC)
canGC, _, gcTimestamp, oldThreshold, newThreshold := repl.checkProtectedTimestampsForGC(ctx, *zone.GC)
if !canGC {
return false, 0
}
// If performing a GC will not advance the GC threshold, there's no reason
// to GC again.
if newThreshold.Equal(oldThreshold) {
return false, 0
}
r := makeGCQueueScore(ctx, repl, gcTimestamp, *zone.GC)
return r.ShouldQueue, r.FinalScore
}
Expand Down Expand Up @@ -442,7 +447,7 @@ func (gcq *gcQueue) process(
// Consult the protected timestamp state to determine whether we can GC and
// the timestamp which can be used to calculate the score and updated GC
// threshold.
canGC, cacheTimestamp, gcTimestamp, newThreshold := repl.checkProtectedTimestampsForGC(ctx, *zone.GC)
canGC, cacheTimestamp, gcTimestamp, _, newThreshold := repl.checkProtectedTimestampsForGC(ctx, *zone.GC)
if !canGC {
return false, nil
}
Expand Down
32 changes: 13 additions & 19 deletions pkg/kv/kvserver/replica_protected_timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,20 +220,20 @@ func (r *Replica) protectedTimestampRecordCurrentlyApplies(
return false, read.readAt.Less(args.RecordAliveAt), nil
}

// checkProtectedTimestampsForGC determines whether the Replica can run GC.
// If the Replica can run GC, this method returns the latest timestamp which
// can be used to determine a valid new GCThreshold. The policy is passed in
// rather than read from the replica state to ensure that the same value used
// for this calculation is used later.
// checkProtectedTimestampsForGC determines whether the Replica can run GC. If
// the Replica can run GC, this method returns the latest timestamp which can be
// used to determine a valid new GCThreshold. The policy is passed in rather
// than read from the replica state to ensure that the same value used for this
// calculation is used later.
//
// In the case that GC can proceed, three timestamps are returned: The timestamp
// In the case that GC can proceed, four timestamps are returned: The timestamp
// corresponding to the state of the cache used to make the determination (used
// for markPendingGC when actually performing GC), the timestamp used as the
// basis to calculate the new gc threshold (used for scoring and reporting), and
// the new gc threshold itself.
// basis to calculate the new gc threshold (used for scoring and reporting), the
// old gc threshold, and the new gc threshold.
func (r *Replica) checkProtectedTimestampsForGC(
ctx context.Context, policy zonepb.GCPolicy,
) (canGC bool, cacheTimestamp, gcTimestamp, newThreshold hlc.Timestamp) {
) (canGC bool, cacheTimestamp, gcTimestamp, oldThreshold, newThreshold hlc.Timestamp) {

// We may be reading the protected timestamp cache while we're holding
// the Replica.mu for reading. If we do so and find newer state in the cache
Expand All @@ -247,10 +247,10 @@ func (r *Replica) checkProtectedTimestampsForGC(
defer r.mu.RUnlock()
defer read.clearIfNotNewer(r.mu.cachedProtectedTS)

gcThreshold := *r.mu.state.GCThreshold
oldThreshold = *r.mu.state.GCThreshold
lease := *r.mu.state.Lease

// earliestValidRecord is the record with the earliest timestamp which is
// read.earliestRecord is the record with the earliest timestamp which is
// greater than the existing gcThreshold.
read = r.readProtectedTimestampsRLocked(ctx, nil)
gcTimestamp = read.readAt
Expand All @@ -266,18 +266,12 @@ func (r *Replica) checkProtectedTimestampsForGC(
if gcTimestamp.Less(lease.Start) {
log.VEventf(ctx, 1, "not gc'ing replica %v due to new lease %v started after %v",
r, lease, gcTimestamp)
return false, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}
return false, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}
}

newThreshold = gc.CalculateThreshold(gcTimestamp, policy)

// If we've already GC'd right up to this record, there's no reason to
// gc again.
if newThreshold.Equal(gcThreshold) {
return false, hlc.Timestamp{}, hlc.Timestamp{}, hlc.Timestamp{}
}

return true, read.readAt, gcTimestamp, newThreshold
return true, read.readAt, gcTimestamp, oldThreshold, newThreshold
}

// markPendingGC is called just prior to sending the GC request to increase the
Expand Down
22 changes: 13 additions & 9 deletions pkg/kv/kvserver/replica_protected_timestamp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) {
name: "lease is too new",
test: func(t *testing.T, r *Replica, mt *manualCache) {
r.mu.state.Lease.Start = r.store.Clock().Now()
canGC, _, gcTimestamp, _ := r.checkProtectedTimestampsForGC(ctx, makePolicy(10))
canGC, _, gcTimestamp, _, _ := r.checkProtectedTimestampsForGC(ctx, makePolicy(10))
require.False(t, canGC)
require.Zero(t, gcTimestamp)
},
Expand All @@ -337,7 +337,7 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) {
})
// We should allow gc to proceed with the normal new threshold if that
// threshold is earlier than all of the records.
canGC, _, gcTimestamp, _ := r.checkProtectedTimestampsForGC(ctx, makePolicy(10))
canGC, _, gcTimestamp, _, _ := r.checkProtectedTimestampsForGC(ctx, makePolicy(10))
require.True(t, canGC)
require.Equal(t, mt.asOf, gcTimestamp)
},
Expand All @@ -362,8 +362,9 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) {
// We should allow gc to proceed up to the timestamp which precedes the
// protected timestamp. This means we expect a GC timestamp 10 seconds
// after ts.Prev() given the policy.
canGC, _, gcTimestamp, _ := r.checkProtectedTimestampsForGC(ctx, makePolicy(10))
canGC, _, gcTimestamp, oldThreshold, newThreshold := r.checkProtectedTimestampsForGC(ctx, makePolicy(10))
require.True(t, canGC)
require.False(t, newThreshold.Equal(oldThreshold))
require.Equal(t, ts.Prev().Add(10*time.Second.Nanoseconds(), 0), gcTimestamp)
},
},
Expand All @@ -386,11 +387,14 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) {
},
},
})
// We should not allow GC if the threshold is already the predecessor
// of the earliest valid record.
canGC, _, gcTimestamp, _ := r.checkProtectedTimestampsForGC(ctx, makePolicy(10))
require.False(t, canGC)
require.Zero(t, gcTimestamp)
// We should allow GC even if the threshold is already the
// predecessor of the earliest valid record. However, the GC
// queue does not enqueue ranges in such cases, so this is only
// applicable to manually enqueued ranges.
canGC, _, gcTimestamp, oldThreshold, newThreshold := r.checkProtectedTimestampsForGC(ctx, makePolicy(10))
require.True(t, canGC)
require.True(t, newThreshold.Equal(oldThreshold))
require.Equal(t, th.Add(10*time.Second.Nanoseconds(), 0), gcTimestamp)
},
},
{
Expand All @@ -411,7 +415,7 @@ func TestCheckProtectedTimestampsForGC(t *testing.T) {
},
},
})
canGC, _, gcTimestamp, _ := r.checkProtectedTimestampsForGC(ctx, makePolicy(10))
canGC, _, gcTimestamp, _, _ := r.checkProtectedTimestampsForGC(ctx, makePolicy(10))
require.True(t, canGC)
require.Equal(t, mt.asOf, gcTimestamp)
},
Expand Down

0 comments on commit a1b85f5

Please sign in to comment.