Skip to content

Commit

Permalink
kv/closedts: don't set synthetic timestamp bit on global table closed ts
Browse files Browse the repository at this point in the history
Informs #101938.

This commit stops setting the synthetic timestamp bit on the closed
timestamps selected for ranges with a LEAD_FOR_GLOBAL_READS closed
timestamp policy.

This flag has been deprecated since v22.2 and is no longer consulted in
uncertainty interval checks or by transaction commit-wait. It does not
need to be propagated from evaluating requests to the closed timestamp.

Release note: None
  • Loading branch information
nvanbenschoten committed Dec 27, 2023
1 parent 9f477d2 commit e8440fc
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 30 deletions.
31 changes: 19 additions & 12 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3602,28 +3602,32 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
defer log.Scope(t).Close(t)

// Detect splits and merges over the global read ranges. Assert that the split
// and merge transactions commit with synthetic timestamps, and that the
// and merge transactions commit with pushed write timestamps, and that the
// commit-wait sleep for these transactions is performed before running their
// commit triggers instead of run on the kv client. For details on why this is
// necessary, see maybeCommitWaitBeforeCommitTrigger.
var clock atomic.Value
var splitsWithSyntheticTS, mergesWithSyntheticTS int64
var clockPtr atomic.Pointer[hlc.Clock]
var splits, merges int64
respFilter := func(ctx context.Context, ba *kvpb.BatchRequest, br *kvpb.BatchResponse) *kvpb.Error {
clock := clockPtr.Load()
if clock == nil {
return nil
}
if req, ok := ba.GetArg(kvpb.EndTxn); ok {
endTxn := req.(*kvpb.EndTxnRequest)
if br.Txn.Status == roachpb.COMMITTED && br.Txn.WriteTimestamp.Synthetic {
if br.Txn.Status == roachpb.COMMITTED && br.Txn.MinTimestamp.Less(br.Txn.WriteTimestamp) {
if ct := endTxn.InternalCommitTrigger; ct != nil {
// The server-side commit-wait sleep should ensure that the commit
// triggers are only run after the commit timestamp is below present
// time.
now := clock.Load().(*hlc.Clock).Now()
now := clock.Now()
require.True(t, br.Txn.WriteTimestamp.Less(now))

switch {
case ct.SplitTrigger != nil:
atomic.AddInt64(&splitsWithSyntheticTS, 1)
atomic.AddInt64(&splits, 1)
case ct.MergeTrigger != nil:
atomic.AddInt64(&mergesWithSyntheticTS, 1)
atomic.AddInt64(&merges, 1)
}
}
}
Expand Down Expand Up @@ -3655,7 +3659,6 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`)
tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`)
tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval = '20ms'`)
clock.Store(s.Clock())
store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID())
require.NoError(t, err)
config.TestingSetupZoneConfigHook(s.Stopper())
Expand All @@ -3667,6 +3670,10 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
_, pErr := kv.SendWrapped(ctx, store.TestSender(), splitArgs)
require.Nil(t, pErr)

// Set the clock to the store's clock, which also serves to engage the
// response filter.
clockPtr.Store(s.Clock())

// Perform a write to the system config span being watched by
// the SystemConfigProvider.
tdb.Exec(t, "CREATE TABLE foo ()")
Expand All @@ -3685,8 +3692,8 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
if splitCount != store.Metrics().CommitWaitsBeforeCommitTrigger.Count() {
return errors.Errorf("commit wait count is %d", store.Metrics().CommitWaitsBeforeCommitTrigger.Count())
}
if splitCount != atomic.LoadInt64(&splitsWithSyntheticTS) {
return errors.Errorf("num splits is %d", atomic.LoadInt64(&splitsWithSyntheticTS))
if splitCount != atomic.LoadInt64(&splits) {
return errors.Errorf("num splits is %d", atomic.LoadInt64(&splits))
}
return nil
})
Expand All @@ -3703,7 +3710,7 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
require.Nil(t, pErr)
splitCount++
require.Equal(t, splitCount, store.Metrics().CommitWaitsBeforeCommitTrigger.Count())
require.Equal(t, splitCount, atomic.LoadInt64(&splitsWithSyntheticTS))
require.Equal(t, splitCount, atomic.LoadInt64(&splits))

repl := store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, splitKey, repl.Desc().StartKey.AsRawKey())
Expand All @@ -3713,7 +3720,7 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {
_, pErr = kv.SendWrapped(ctx, store.TestSender(), mergeArgs)
require.Nil(t, pErr)
require.Equal(t, splitCount+1, store.Metrics().CommitWaitsBeforeCommitTrigger.Count())
require.Equal(t, int64(1), atomic.LoadInt64(&mergesWithSyntheticTS))
require.Equal(t, int64(1), atomic.LoadInt64(&merges))

repl = store.LookupReplica(roachpb.RKey(splitKey))
require.Equal(t, descKey, repl.Desc().StartKey.AsRawKey())
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/closedts/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ func TargetForPolicy(
leadTimeAtSender = leadTargetOverride
}

// Mark as synthetic, because this time is in the future.
res = now.ToTimestamp().Add(leadTimeAtSender.Nanoseconds(), 0).WithSynthetic(true)
res = now.ToTimestamp().Add(leadTimeAtSender.Nanoseconds(), 0)
default:
panic("unexpected RangeClosedTimestampPolicy")
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/kv/kvserver/closedts/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,23 +55,21 @@ func TestTargetForPolicy(t *testing.T) {
expClosedTSTarget: now.
Add((maxClockOffset +
millis(275) /* sideTransportPropTime */ +
millis(25) /* bufferTime */).Nanoseconds(), 0).
WithSynthetic(true),
millis(25) /* bufferTime */).Nanoseconds(), 0),
},
{
sideTransportCloseInterval: millis(50),
rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS,
expClosedTSTarget: now.
Add((maxClockOffset +
millis(245) /* raftTransportPropTime */ +
millis(25) /* bufferTime */).Nanoseconds(), 0).
WithSynthetic(true),
millis(25) /* bufferTime */).Nanoseconds(), 0),
},
{
leadTargetOverride: millis(1234),
sideTransportCloseInterval: millis(200),
rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS,
expClosedTSTarget: now.Add(millis(1234).Nanoseconds(), 0).WithSynthetic(true),
expClosedTSTarget: now.Add(millis(1234).Nanoseconds(), 0),
},
} {
t.Run("", func(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/closedts/sidetransport/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ func (m *mockStores) getAndClearRecording() []rangeUpdate {
var ts10 = hlc.Timestamp{WallTime: 10}
var ts11 = hlc.Timestamp{WallTime: 11}
var ts12 = hlc.Timestamp{WallTime: 12}
var ts20 = hlc.Timestamp{WallTime: 20, Synthetic: true}
var ts21 = hlc.Timestamp{WallTime: 21, Synthetic: true}
var ts22 = hlc.Timestamp{WallTime: 22, Synthetic: true}
var ts20 = hlc.Timestamp{WallTime: 20}
var ts21 = hlc.Timestamp{WallTime: 21}
var ts22 = hlc.Timestamp{WallTime: 22}
var laiZero = kvpb.LeaseAppliedIndex(0)

const lai100 = kvpb.LeaseAppliedIndex(100)
Expand Down
7 changes: 1 addition & 6 deletions pkg/kv/kvserver/kvserverpb/lease_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,5 @@ func (st LeaseStatus) Expiration() hlc.Timestamp {
// Until a new lease is acquired, all writes will be pushed into this last
// nanosecond of the lease.
func (st LeaseStatus) ClosedTimestampUpperBound() hlc.Timestamp {
// HACK(andrei): We declare the lease expiration to be synthetic by fiat,
// because it frequently is synthetic even though currently it's not marked
// as such. See the TODO in Timestamp.Add() about the work remaining to
// properly mark these timestamps as synthetic. We need to make sure it's
// synthetic here so that the results of Backwards() can be synthetic.
return st.Expiration().WithSynthetic(true).WallPrev()
return st.Expiration().WallPrev()
}
3 changes: 1 addition & 2 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -924,8 +924,7 @@ func TestProposalBufferClosedTimestamp(t *testing.T) {
nowMinusTwiceClosedLag := nowTS.Add(-2*closedts.TargetDuration.Get(&st.SV).Nanoseconds(), 0)
nowPlusGlobalReadLead := nowTS.Add((maxOffset +
275*time.Millisecond /* sideTransportPropTime */ +
25*time.Millisecond /* bufferTime */).Nanoseconds(), 0).
WithSynthetic(true)
25*time.Millisecond /* bufferTime */).Nanoseconds(), 0)
expiredLeaseTimestamp := nowTS.Add(-1000, 0)
someClosedTS := nowTS.Add(-2000, 0)

Expand Down

0 comments on commit e8440fc

Please sign in to comment.