Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv/closedts: remove use and propagation of synthetic timestamp bit #117016

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3602,28 +3602,32 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
defer log.Scope(t).Close(t)

// Detect splits and merges over the global read ranges. Assert that the split
// and merge transactions commit with synthetic timestamps, and that the
// and merge transactions commit with pushed write timestamps, and that the
// commit-wait sleep for these transactions is performed before running their
// commit triggers instead of run on the kv client. For details on why this is
// necessary, see maybeCommitWaitBeforeCommitTrigger.
var clock atomic.Value
var splitsWithSyntheticTS, mergesWithSyntheticTS int64
var clockPtr atomic.Pointer[hlc.Clock]
var splits, merges int64
respFilter := func(ctx context.Context, ba *kvpb.BatchRequest, br *kvpb.BatchResponse) *kvpb.Error {
clock := clockPtr.Load()
if clock == nil {
return nil
}
if req, ok := ba.GetArg(kvpb.EndTxn); ok {
endTxn := req.(*kvpb.EndTxnRequest)
if br.Txn.Status == roachpb.COMMITTED && br.Txn.WriteTimestamp.Synthetic {
if br.Txn.Status == roachpb.COMMITTED && br.Txn.MinTimestamp.Less(br.Txn.WriteTimestamp) {
if ct := endTxn.InternalCommitTrigger; ct != nil {
// The server-side commit-wait sleep should ensure that the commit
// triggers are only run after the commit timestamp is below present
// time.
now := clock.Load().(*hlc.Clock).Now()
now := clock.Now()
require.True(t, br.Txn.WriteTimestamp.Less(now))

switch {
case ct.SplitTrigger != nil:
atomic.AddInt64(&splitsWithSyntheticTS, 1)
atomic.AddInt64(&splits, 1)
case ct.MergeTrigger != nil:
atomic.AddInt64(&mergesWithSyntheticTS, 1)
atomic.AddInt64(&merges, 1)
}
}
}
Expand Down Expand Up @@ -3655,7 +3659,6 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`)
tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval = '20ms'`)
clock.Store(s.Clock())
store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)
config.TestingSetupZoneConfigHook(s.Stopper())
Expand All @@ -3667,6 +3670,10 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
_, pErr := kv.SendWrapped(ctx, store.TestSender(), splitArgs)
require.Nil(t, pErr)

// Set the clock to the store's clock, which also serves to engage the
// response filter.
clockPtr.Store(s.Clock())

// Perform a write to the system config span being watched by
// the SystemConfigProvider.
tdb.Exec(t, "CREATE TABLE foo ()")
Expand All @@ -3685,8 +3692,8 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
if splitCount != store.Metrics().CommitWaitsBeforeCommitTrigger.Count() {
return errors.Errorf("commit wait count is %d", store.Metrics().CommitWaitsBeforeCommitTrigger.Count())
}
if splitCount != atomic.LoadInt64(&splitsWithSyntheticTS) {
return errors.Errorf("num splits is %d", atomic.LoadInt64(&splitsWithSyntheticTS))
if splitCount != atomic.LoadInt64(&splits) {
return errors.Errorf("num splits is %d", atomic.LoadInt64(&splits))
}
return nil
})
Expand All @@ -3703,7 +3710,7 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
require.Nil(t, pErr)
splitCount++
require.Equal(t, splitCount, store.Metrics().CommitWaitsBeforeCommitTrigger.Count())
require.Equal(t, splitCount, atomic.LoadInt64(&splitsWithSyntheticTS))
require.Equal(t, splitCount, atomic.LoadInt64(&splits))

repl := store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, splitKey, repl.Desc().StartKey.AsRawKey())
Expand All @@ -3713,7 +3720,7 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
_, pErr = kv.SendWrapped(ctx, store.TestSender(), mergeArgs)
require.Nil(t, pErr)
require.Equal(t, splitCount+1, store.Metrics().CommitWaitsBeforeCommitTrigger.Count())
require.Equal(t, int64(1), atomic.LoadInt64(&mergesWithSyntheticTS))
require.Equal(t, int64(1), atomic.LoadInt64(&merges))

repl = store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, descKey, repl.Desc().StartKey.AsRawKey())
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/closedts/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ func TargetForPolicy(
leadTimeAtSender = leadTargetOverride
}

// Mark as synthetic, because this time is in the future.
res = now.ToTimestamp().Add(leadTimeAtSender.Nanoseconds(), 0).WithSynthetic(true)
res = now.ToTimestamp().Add(leadTimeAtSender.Nanoseconds(), 0)
default:
panic("unexpected RangeClosedTimestampPolicy")
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/kv/kvserver/closedts/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,21 @@ func TestTargetForPolicy(t *testing.T) {
expClosedTSTarget: now.
Add((maxClockOffset +
millis(275) /* sideTransportPropTime */ +
millis(25) /* bufferTime */).Nanoseconds(), 0).
WithSynthetic(true),
millis(25) /* bufferTime */).Nanoseconds(), 0),
},
{
sideTransportCloseInterval: millis(50),
rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS,
expClosedTSTarget: now.
Add((maxClockOffset +
millis(245) /* raftTransportPropTime */ +
millis(25) /* bufferTime */).Nanoseconds(), 0).
WithSynthetic(true),
millis(25) /* bufferTime */).Nanoseconds(), 0),
},
{
leadTargetOverride: millis(1234),
sideTransportCloseInterval: millis(200),
rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS,
expClosedTSTarget: now.Add(millis(1234).Nanoseconds(), 0).WithSynthetic(true),
expClosedTSTarget: now.Add(millis(1234).Nanoseconds(), 0),
},
} {
t.Run("", func(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/closedts/sidetransport/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ func (m *mockStores) getAndClearRecording() []rangeUpdate {
var ts10 = hlc.Timestamp{WallTime: 10}
var ts11 = hlc.Timestamp{WallTime: 11}
var ts12 = hlc.Timestamp{WallTime: 12}
var ts20 = hlc.Timestamp{WallTime: 20, Synthetic: true}
var ts21 = hlc.Timestamp{WallTime: 21, Synthetic: true}
var ts22 = hlc.Timestamp{WallTime: 22, Synthetic: true}
var ts20 = hlc.Timestamp{WallTime: 20}
var ts21 = hlc.Timestamp{WallTime: 21}
var ts22 = hlc.Timestamp{WallTime: 22}
var laiZero = kvpb.LeaseAppliedIndex(0)

const lai100 = kvpb.LeaseAppliedIndex(100)
Expand Down
33 changes: 7 additions & 26 deletions pkg/kv/kvserver/closedts/tracker/lockfree_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,13 +156,13 @@ func (t *lockfreeTracker) Track(ctx context.Context, ts hlc.Timestamp) RemovalTo
// b1 and the rest (the "high" ones) go on to create and join b2). But that's
// harder to implement.
if !initialized || wts <= t1 {
return b1.extendAndJoin(ctx, wts, ts.Synthetic)
return b1.extendAndJoin(ctx, wts)
}

// We know that b1 < wts. We can technically join either bucket, but we always
// prefer b2 in order to let b1 drain as soon as possible (at which point
// we'll be able to create a new bucket).
return b2.extendAndJoin(ctx, wts, ts.Synthetic)
return b2.extendAndJoin(ctx, wts)
}

// Untrack is part of the Tracker interface.
Expand All @@ -176,7 +176,6 @@ func (t *lockfreeTracker) Untrack(ctx context.Context, tok RemovalToken) {
if b.refcnt == 0 {
// Reset the bucket, so that future Track() calls can create a new one.
b.ts = 0
b.synthetic = 0
// If we reset b1, swap the pointers, so that, if b2 is currently
// initialized, it becomes b1. If a single bucket is initialized, we want it
// to be b1.
Expand All @@ -196,9 +195,8 @@ func (t *lockfreeTracker) LowerBound(ctx context.Context) hlc.Timestamp {
return hlc.Timestamp{}
}
return hlc.Timestamp{
WallTime: ts,
Logical: 0,
Synthetic: t.b1.isSynthetic(),
WallTime: ts,
Logical: 0,
}
}

Expand All @@ -213,9 +211,8 @@ func (t *lockfreeTracker) Count() int {
// A bucket can be initialized or uninitialized. It's initialized when the ts is
// set.
type bucket struct {
ts int64 // atomic, nanos
refcnt int32 // atomic
synthetic int32 // atomic
ts int64 // atomic, nanos
refcnt int32 // atomic
}

func (b *bucket) String() string {
Expand All @@ -234,18 +231,12 @@ func (b *bucket) timestamp() (int64, bool) {
return ts, ts != 0
}

// isSynthetic returns true if the bucket's timestamp (i.e. the bucket's lower
// bound) should be considered a synthetic timestamp.
func (b *bucket) isSynthetic() bool {
return atomic.LoadInt32(&b.synthetic) != 0
}

// extendAndJoin extends the bucket downwards (if necessary) so that its
// timestamp is <= ts, and then adds a timestamp to the bucket. It returns a
// token to be used for removing the timestamp from the bucket.
//
// If the bucket it not initialized, it will be initialized to ts.
func (b *bucket) extendAndJoin(ctx context.Context, ts int64, synthetic bool) lockfreeToken {
func (b *bucket) extendAndJoin(ctx context.Context, ts int64) lockfreeToken {
// Loop until either we set the bucket's timestamp, or someone else sets it to
// an even lower value.
var t int64
Expand All @@ -258,16 +249,6 @@ func (b *bucket) extendAndJoin(ctx context.Context, ts int64, synthetic bool) lo
break
}
}
// If we created the bucket, then we dictate if its lower bound will be
// considered a synthetic timestamp or not. It's possible that we're now
// inserting a synthetic timestamp into the bucket but, over time, a higher
// non-synthetic timestamp joins. Or, that a lower non-synthetic timestamp
// joins. In either case, the bucket will remain "synthetic" although it'd be
// correct to make it non-synthetic. We don't make an effort to keep the
// synthetic bit up to date within a bucket.
if t == 0 && synthetic {
atomic.StoreInt32(&b.synthetic, 1)
}
atomic.AddInt32(&b.refcnt, 1)
return lockfreeToken{b: b}
}
Expand Down
5 changes: 0 additions & 5 deletions pkg/kv/kvserver/closedts/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ type Tracker interface {
// The returned timestamp might be smaller than the lowest timestamp ever
// inserted into the set. Implementations are allowed to round timestamps
// down.
//
// Synthetic timestamps: The Tracker doesn't necessarily track synthetic /
// physical timestamps precisely; the only guarantee implementations need to
// make is that, if no synthethic timestamp is inserted into the tracked set
// for a while, eventually the LowerBound value will not be synthetic.
LowerBound(context.Context) hlc.Timestamp

// Count returns the current size of the tracked set.
Expand Down
13 changes: 0 additions & 13 deletions pkg/kv/kvserver/closedts/tracker/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,6 @@ func testTracker(ctx context.Context, t *testing.T, tr Tracker) {
}
tr.Untrack(ctx, tok30)
require.True(t, tr.LowerBound(ctx).IsEmpty())

// Check that synthetic timestamps are tracked as such.
synthTS := hlc.Timestamp{
WallTime: 10,
Synthetic: true,
}
tok := tr.Track(ctx, synthTS)
require.Equal(t, synthTS, tr.LowerBound(ctx))
// Check that after the Tracker is emptied, lowerbounds are not synthetic any
// more.
tr.Untrack(ctx, tok)
tr.Track(ctx, ts(10))
require.Equal(t, ts(10), tr.LowerBound(ctx))
}

// Test the tracker by throwing random requests at it. We verify that, at all
Expand Down
7 changes: 1 addition & 6 deletions pkg/kv/kvserver/kvserverpb/lease_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,5 @@ func (st LeaseStatus) Expiration() hlc.Timestamp {
// Until a new lease is acquired, all writes will be pushed into this last
// nanosecond of the lease.
func (st LeaseStatus) ClosedTimestampUpperBound() hlc.Timestamp {
// HACK(andrei): We declare the lease expiration to be synthetic by fiat,
// because it frequently is synthetic even though currently it's not marked
// as such. See the TODO in Timestamp.Add() about the work remaining to
// properly mark these timestamps as synthetic. We need to make sure it's
// synthetic here so that the results of Backwards() can be synthetic.
return st.Expiration().WithSynthetic(true).WallPrev()
return st.Expiration().WallPrev()
}
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,8 +924,7 @@ func TestProposalBufferClosedTimestamp(t *testing.T) {
nowMinusTwiceClosedLag := nowTS.Add(-2*closedts.TargetDuration.Get(&st.SV).Nanoseconds(), 0)
nowPlusGlobalReadLead := nowTS.Add((maxOffset +
275*time.Millisecond /* sideTransportPropTime */ +
25*time.Millisecond /* bufferTime */).Nanoseconds(), 0).
WithSynthetic(true)
25*time.Millisecond /* bufferTime */).Nanoseconds(), 0)
expiredLeaseTimestamp := nowTS.Add(-1000, 0)
someClosedTS := nowTS.Add(-2000, 0)

Expand Down
Loading