From e8440fc5525921a9f322dca77df2dddddb7b7e41 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 19 Dec 2023 13:48:35 -0500 Subject: [PATCH] kv/closedts: don't set synthetic timestamp bit on global table closed ts Informs #101938. This commit stops setting the synthetic timestamp bit on the closed timestamps selected for ranges with a LEAD_FOR_GLOBAL_READS closed timestamp policy. This flag has been deprecated since v22.2 and is no longer consulted in uncertainty interval checks or by transaction commit-wait. It does not need to be propagated from evaluating requests to the closed timestamp. Release note: None --- pkg/kv/kvserver/client_split_test.go | 31 ++++++++++++------- pkg/kv/kvserver/closedts/policy.go | 3 +- pkg/kv/kvserver/closedts/policy_test.go | 8 ++--- .../closedts/sidetransport/receiver_test.go | 6 ++-- pkg/kv/kvserver/kvserverpb/lease_status.go | 7 +---- pkg/kv/kvserver/replica_proposal_buf_test.go | 3 +- 6 files changed, 28 insertions(+), 30 deletions(-) diff --git a/pkg/kv/kvserver/client_split_test.go b/pkg/kv/kvserver/client_split_test.go index 13710d3f2d22..e390f99ff3c7 100644 --- a/pkg/kv/kvserver/client_split_test.go +++ b/pkg/kv/kvserver/client_split_test.go @@ -3602,28 +3602,32 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) { defer log.Scope(t).Close(t) // Detect splits and merges over the global read ranges. Assert that the split - // and merge transactions commit with synthetic timestamps, and that the + // and merge transactions commit with pushed write timestamps, and that the // commit-wait sleep for these transactions is performed before running their // commit triggers instead of run on the kv client. For details on why this is // necessary, see maybeCommitWaitBeforeCommitTrigger. - var clock atomic.Value - var splitsWithSyntheticTS, mergesWithSyntheticTS int64 + var clockPtr atomic.Pointer[hlc.Clock] + var splits, merges int64 respFilter := func(ctx context.Context, ba *kvpb.BatchRequest, br *kvpb.BatchResponse) *kvpb.Error { + clock := clockPtr.Load() + if clock == nil { + return nil + } if req, ok := ba.GetArg(kvpb.EndTxn); ok { endTxn := req.(*kvpb.EndTxnRequest) - if br.Txn.Status == roachpb.COMMITTED && br.Txn.WriteTimestamp.Synthetic { + if br.Txn.Status == roachpb.COMMITTED && br.Txn.MinTimestamp.Less(br.Txn.WriteTimestamp) { if ct := endTxn.InternalCommitTrigger; ct != nil { // The server-side commit-wait sleep should ensure that the commit // triggers are only run after the commit timestamp is below present // time. - now := clock.Load().(*hlc.Clock).Now() + now := clock.Now() require.True(t, br.Txn.WriteTimestamp.Less(now)) switch { case ct.SplitTrigger != nil: - atomic.AddInt64(&splitsWithSyntheticTS, 1) + atomic.AddInt64(&splits, 1) case ct.MergeTrigger != nil: - atomic.AddInt64(&mergesWithSyntheticTS, 1) + atomic.AddInt64(&merges, 1) } } } @@ -3655,7 +3659,6 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) { tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '20ms'`) tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.side_transport_interval = '20ms'`) tdb.Exec(t, `SET CLUSTER SETTING kv.rangefeed.closed_timestamp_refresh_interval = '20ms'`) - clock.Store(s.Clock()) store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID()) require.NoError(t, err) config.TestingSetupZoneConfigHook(s.Stopper()) @@ -3667,6 +3670,10 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) { _, pErr := kv.SendWrapped(ctx, store.TestSender(), splitArgs) require.Nil(t, pErr) + // Set the clock to the store's clock, which also serves to engage the + // response filter. + clockPtr.Store(s.Clock()) + // Perform a write to the system config span being watched by // the SystemConfigProvider. tdb.Exec(t, "CREATE TABLE foo ()") @@ -3685,8 +3692,8 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) { if splitCount != store.Metrics().CommitWaitsBeforeCommitTrigger.Count() { return errors.Errorf("commit wait count is %d", store.Metrics().CommitWaitsBeforeCommitTrigger.Count()) } - if splitCount != atomic.LoadInt64(&splitsWithSyntheticTS) { - return errors.Errorf("num splits is %d", atomic.LoadInt64(&splitsWithSyntheticTS)) + if splitCount != atomic.LoadInt64(&splits) { + return errors.Errorf("num splits is %d", atomic.LoadInt64(&splits)) } return nil }) @@ -3703,7 +3710,7 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) { require.Nil(t, pErr) splitCount++ require.Equal(t, splitCount, store.Metrics().CommitWaitsBeforeCommitTrigger.Count()) - require.Equal(t, splitCount, atomic.LoadInt64(&splitsWithSyntheticTS)) + require.Equal(t, splitCount, atomic.LoadInt64(&splits)) repl := store.LookupReplica(roachpb.RKey(splitKey)) require.Equal(t, splitKey, repl.Desc().StartKey.AsRawKey()) @@ -3713,7 +3720,7 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) { _, pErr = kv.SendWrapped(ctx, store.TestSender(), mergeArgs) require.Nil(t, pErr) require.Equal(t, splitCount+1, store.Metrics().CommitWaitsBeforeCommitTrigger.Count()) - require.Equal(t, int64(1), atomic.LoadInt64(&mergesWithSyntheticTS)) + require.Equal(t, int64(1), atomic.LoadInt64(&merges)) repl = store.LookupReplica(roachpb.RKey(splitKey)) require.Equal(t, descKey, repl.Desc().StartKey.AsRawKey()) diff --git a/pkg/kv/kvserver/closedts/policy.go b/pkg/kv/kvserver/closedts/policy.go index 311d65a57c95..7e3b13371213 100644 --- a/pkg/kv/kvserver/closedts/policy.go +++ b/pkg/kv/kvserver/closedts/policy.go @@ -121,8 +121,7 @@ func TargetForPolicy( leadTimeAtSender = leadTargetOverride } - // Mark as synthetic, because this time is in the future. - res = now.ToTimestamp().Add(leadTimeAtSender.Nanoseconds(), 0).WithSynthetic(true) + res = now.ToTimestamp().Add(leadTimeAtSender.Nanoseconds(), 0) default: panic("unexpected RangeClosedTimestampPolicy") } diff --git a/pkg/kv/kvserver/closedts/policy_test.go b/pkg/kv/kvserver/closedts/policy_test.go index b43f8d9ec309..6ed5c503f867 100644 --- a/pkg/kv/kvserver/closedts/policy_test.go +++ b/pkg/kv/kvserver/closedts/policy_test.go @@ -55,8 +55,7 @@ func TestTargetForPolicy(t *testing.T) { expClosedTSTarget: now. Add((maxClockOffset + millis(275) /* sideTransportPropTime */ + - millis(25) /* bufferTime */).Nanoseconds(), 0). - WithSynthetic(true), + millis(25) /* bufferTime */).Nanoseconds(), 0), }, { sideTransportCloseInterval: millis(50), @@ -64,14 +63,13 @@ func TestTargetForPolicy(t *testing.T) { expClosedTSTarget: now. Add((maxClockOffset + millis(245) /* raftTransportPropTime */ + - millis(25) /* bufferTime */).Nanoseconds(), 0). - WithSynthetic(true), + millis(25) /* bufferTime */).Nanoseconds(), 0), }, { leadTargetOverride: millis(1234), sideTransportCloseInterval: millis(200), rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS, - expClosedTSTarget: now.Add(millis(1234).Nanoseconds(), 0).WithSynthetic(true), + expClosedTSTarget: now.Add(millis(1234).Nanoseconds(), 0), }, } { t.Run("", func(t *testing.T) { diff --git a/pkg/kv/kvserver/closedts/sidetransport/receiver_test.go b/pkg/kv/kvserver/closedts/sidetransport/receiver_test.go index 95ae9e359624..7ea7449c08f6 100644 --- a/pkg/kv/kvserver/closedts/sidetransport/receiver_test.go +++ b/pkg/kv/kvserver/closedts/sidetransport/receiver_test.go @@ -62,9 +62,9 @@ func (m *mockStores) getAndClearRecording() []rangeUpdate { var ts10 = hlc.Timestamp{WallTime: 10} var ts11 = hlc.Timestamp{WallTime: 11} var ts12 = hlc.Timestamp{WallTime: 12} -var ts20 = hlc.Timestamp{WallTime: 20, Synthetic: true} -var ts21 = hlc.Timestamp{WallTime: 21, Synthetic: true} -var ts22 = hlc.Timestamp{WallTime: 22, Synthetic: true} +var ts20 = hlc.Timestamp{WallTime: 20} +var ts21 = hlc.Timestamp{WallTime: 21} +var ts22 = hlc.Timestamp{WallTime: 22} var laiZero = kvpb.LeaseAppliedIndex(0) const lai100 = kvpb.LeaseAppliedIndex(100) diff --git a/pkg/kv/kvserver/kvserverpb/lease_status.go b/pkg/kv/kvserver/kvserverpb/lease_status.go index 3f1c812416f8..d5fbfdd4e7d6 100644 --- a/pkg/kv/kvserver/kvserverpb/lease_status.go +++ b/pkg/kv/kvserver/kvserverpb/lease_status.go @@ -60,10 +60,5 @@ func (st LeaseStatus) Expiration() hlc.Timestamp { // Until a new lease is acquired, all writes will be pushed into this last // nanosecond of the lease. func (st LeaseStatus) ClosedTimestampUpperBound() hlc.Timestamp { - // HACK(andrei): We declare the lease expiration to be synthetic by fiat, - // because it frequently is synthetic even though currently it's not marked - // as such. See the TODO in Timestamp.Add() about the work remaining to - // properly mark these timestamps as synthetic. We need to make sure it's - // synthetic here so that the results of Backwards() can be synthetic. - return st.Expiration().WithSynthetic(true).WallPrev() + return st.Expiration().WallPrev() } diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 2f58569658b1..34f0a3bce9d0 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -924,8 +924,7 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { nowMinusTwiceClosedLag := nowTS.Add(-2*closedts.TargetDuration.Get(&st.SV).Nanoseconds(), 0) nowPlusGlobalReadLead := nowTS.Add((maxOffset + 275*time.Millisecond /* sideTransportPropTime */ + - 25*time.Millisecond /* bufferTime */).Nanoseconds(), 0). - WithSynthetic(true) + 25*time.Millisecond /* bufferTime */).Nanoseconds(), 0) expiredLeaseTimestamp := nowTS.Add(-1000, 0) someClosedTS := nowTS.Add(-2000, 0)