diff --git a/pkg/ccl/multiregionccl/regional_by_row_test.go b/pkg/ccl/multiregionccl/regional_by_row_test.go index 9015f1d39003..e6bb0b4bd188 100644 --- a/pkg/ccl/multiregionccl/regional_by_row_test.go +++ b/pkg/ccl/multiregionccl/regional_by_row_test.go @@ -365,6 +365,15 @@ func TestAlterTableLocalityRegionalByRowError(t *testing.T) { // TTL into the system with AddImmediateGCZoneConfig. defer sqltestutils.DisableGCTTLStrictEnforcement(t, sqlDB)() + // Drop the closed timestamp target lead for GLOBAL tables. + // The test passes with it configured to its default, but it + // is very slow due to #61444 (2.5s vs. 35s). + // TODO(nvanbenschoten): We can remove this when that issue + // is addressed. + if _, err := sqlDB.Exec(`SET CLUSTER SETTING kv.closed_timestamp.lead_for_global_reads_override = '5ms'`); err != nil { + t.Fatal(err) + } + if _, err := sqlDB.Exec(fmt.Sprintf(` CREATE DATABASE t PRIMARY REGION "ajstorm-1"; USE t; diff --git a/pkg/kv/kvserver/closedts/policy.go b/pkg/kv/kvserver/closedts/policy.go index 509eb9ebe6a7..245bb544754d 100644 --- a/pkg/kv/kvserver/closedts/policy.go +++ b/pkg/kv/kvserver/closedts/policy.go @@ -23,19 +23,105 @@ func TargetForPolicy( now hlc.ClockTimestamp, maxClockOffset time.Duration, lagTargetDuration time.Duration, + leadTargetOverride time.Duration, + sideTransportCloseInterval time.Duration, policy roachpb.RangeClosedTimestampPolicy, ) hlc.Timestamp { switch policy { - case roachpb.LAG_BY_CLUSTER_SETTING, roachpb.LEAD_FOR_GLOBAL_READS: - return hlc.Timestamp{WallTime: now.WallTime - lagTargetDuration.Nanoseconds()} - // TODO(andrei,nvanbenschoten): Resolve all the issues preventing us from closing - // timestamps in the future (which, in turn, forces future-time writes on - // global ranges), and enable the proper logic below. - //case roachpb.LEAD_FOR_GLOBAL_READS: - // closedTSTarget = hlc.Timestamp{ - // WallTime: now + 2*maxClockOffset.Nanoseconds(), - // Synthetic: true, - // } + case roachpb.LAG_BY_CLUSTER_SETTING: + // Simple calculation: lag now by desired duration. + return now.ToTimestamp().Add(-lagTargetDuration.Nanoseconds(), 0) + case roachpb.LEAD_FOR_GLOBAL_READS: + // The LEAD_FOR_GLOBAL_READS calculation is more complex. Instead of the + // policy defining an offset from the publisher's perspective, the + // policy defines a goal from the consumer's perspective - the goal + // being that present time reads (with a possible uncertainty interval) + // can be served from all followers. To accomplish this, we must work + // backwards to establish a lead time to publish closed timestamps at. + // + // The calculation looks something like the following: + // + // # This should be sufficient for any present-time transaction, + // # because its global uncertainty limit should be <= this time. + // # For more, see (*Transaction).RequiredFrontier. + // closed_ts_at_follower = now + max_offset + // + // # The sender must account for the time it takes to propagate a + // # closed timestamp update to its followers. + // closed_ts_at_sender = closed_ts_at_follower + propagation_time + // + // # Closed timestamps propagate in two ways. Both need to make it to + // # followers in time. + // propagation_time = max(raft_propagation_time, side_propagation_time) + // + // # Raft propagation takes 3 network hops to go from a leader proposing + // # a write (with a closed timestamp update) to the write being applied. + // # 1. leader sends MsgProp with entry + // # 2. followers send MsgPropResp with vote + // # 3. leader sends MsgProp with higher commit index + // # + // # We also add on a small bit of overhead for request evaluation, log + // # sync, and state machine apply latency. + // raft_propagation_time = max_network_rtt * 1.5 + raft_overhead + // + // # Side-transport propagation takes 1 network hop, as there is no voting. + // # However, it is delayed by the full side_transport_close_interval in + // # the worst-case. + // side_propagation_time = max_network_rtt * 0.5 + side_transport_close_interval + // + // # Combine, we get the following result + // closed_ts_at_sender = now + max_offset + max( + // max_network_rtt * 1.5 + raft_overhead, + // max_network_rtt * 0.5 + side_transport_close_interval, + // ) + // + // By default, this leads to a closed timestamp target that leads the + // senders current clock by 800ms. + // + // NOTE: this calculation takes into consideration maximum clock skew as + // it relates to a transaction's uncertainty interval, but it does not + // take into consideration "effective" clock skew as it relates to a + // follower replica having a faster clock than a leaseholder and + // therefore needing the leaseholder to publish even further into the + // future. Since the effect of getting this wrong is reduced performance + // (i.e. missed follower reads) and not a correctness violation (i.e. + // stale reads), we can be less strict here. We also expect that even + // when two nodes have skewed physical clocks, the "stability" property + // of HLC propagation when nodes are communicating should reduce the + // effective HLC clock skew. + + // TODO(nvanbenschoten): make this dynamic, based on the measured + // network latencies recorded by the RPC context. This isn't trivial and + // brings up a number of questions. For instance, how far into the tail + // do we care about? Do we place upper and lower bounds on this value? + const maxNetworkRTT = 150 * time.Millisecond + + // See raft_propagation_time. + const raftTransportOverhead = 20 * time.Millisecond + raftTransportPropTime := (maxNetworkRTT*3)/2 + raftTransportOverhead + + // See side_propagation_time. + sideTransportPropTime := maxNetworkRTT/2 + sideTransportCloseInterval + + // See propagation_time. + maxTransportPropTime := sideTransportPropTime + if maxTransportPropTime < raftTransportPropTime { + maxTransportPropTime = raftTransportPropTime + } + + // Include a small amount of extra margin to smooth out temporary + // network blips or anything else that slows down closed timestamp + // propagation momentarily. + const bufferTime = 25 * time.Millisecond + leadTimeAtSender := maxTransportPropTime + maxClockOffset + bufferTime + + // Override entirely with cluster setting, if necessary. + if leadTargetOverride != 0 { + leadTimeAtSender = leadTargetOverride + } + + // Mark as synthetic, because this time is in the future. + return now.ToTimestamp().Add(leadTimeAtSender.Nanoseconds(), 0).WithSynthetic(true) default: panic("unexpected RangeClosedTimestampPolicy") } diff --git a/pkg/kv/kvserver/closedts/policy_test.go b/pkg/kv/kvserver/closedts/policy_test.go index 93f133bd033e..b43f8d9ec309 100644 --- a/pkg/kv/kvserver/closedts/policy_test.go +++ b/pkg/kv/kvserver/closedts/policy_test.go @@ -12,6 +12,7 @@ package closedts import ( "testing" + "time" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -24,29 +25,64 @@ func TestTargetForPolicy(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - const nowNanos = 100 - const maxOffsetNanos = 20 - const lagTargetNanos = 10 + cast := func(i int, unit time.Duration) time.Duration { return time.Duration(i) * unit } + secs := func(i int) time.Duration { return cast(i, time.Second) } + millis := func(i int) time.Duration { return cast(i, time.Millisecond) } + + now := hlc.Timestamp{WallTime: millis(100).Nanoseconds()} + maxClockOffset := millis(500) for _, tc := range []struct { - rangePolicy roachpb.RangeClosedTimestampPolicy - expClosedTSTarget hlc.Timestamp + lagTargetNanos time.Duration + leadTargetOverride time.Duration + sideTransportCloseInterval time.Duration + rangePolicy roachpb.RangeClosedTimestampPolicy + expClosedTSTarget hlc.Timestamp }{ { + lagTargetNanos: secs(3), + rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING, + expClosedTSTarget: now.Add(-secs(3).Nanoseconds(), 0), + }, + { + lagTargetNanos: secs(1), rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING, - expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos - lagTargetNanos}, + expClosedTSTarget: now.Add(-secs(1).Nanoseconds(), 0), + }, + { + sideTransportCloseInterval: millis(200), + rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS, + expClosedTSTarget: now. + Add((maxClockOffset + + millis(275) /* sideTransportPropTime */ + + millis(25) /* bufferTime */).Nanoseconds(), 0). + WithSynthetic(true), + }, + { + sideTransportCloseInterval: millis(50), + rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS, + expClosedTSTarget: now. + Add((maxClockOffset + + millis(245) /* raftTransportPropTime */ + + millis(25) /* bufferTime */).Nanoseconds(), 0). + WithSynthetic(true), }, { - rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS, - expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos - lagTargetNanos}, - // TODO(andrei, nvanbenschoten): What we should be expecting here is the following, once - // the propBuf starts properly implementing this timestamp closing policy: - // expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos + 2*maxOffsetNanos, Synthetic: true}, + leadTargetOverride: millis(1234), + sideTransportCloseInterval: millis(200), + rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS, + expClosedTSTarget: now.Add(millis(1234).Nanoseconds(), 0).WithSynthetic(true), }, } { - t.Run(tc.rangePolicy.String(), func(t *testing.T) { - now := hlc.ClockTimestamp{WallTime: nowNanos} - target := TargetForPolicy(now, maxOffsetNanos, lagTargetNanos, tc.rangePolicy) + t.Run("", func(t *testing.T) { + target := TargetForPolicy( + now.UnsafeToClockTimestamp(), + maxClockOffset, + tc.lagTargetNanos, + tc.leadTargetOverride, + tc.sideTransportCloseInterval, + tc.rangePolicy, + ) require.Equal(t, tc.expClosedTSTarget, target) }) } diff --git a/pkg/kv/kvserver/closedts/setting.go b/pkg/kv/kvserver/closedts/setting.go index 36fc42cb0f36..16f77b5bf4c4 100644 --- a/pkg/kv/kvserver/closedts/setting.go +++ b/pkg/kv/kvserver/closedts/setting.go @@ -46,3 +46,14 @@ var SideTransportCloseInterval = settings.RegisterDurationSetting( 200*time.Millisecond, settings.NonNegativeDuration, ) + +// LeadForGlobalReadsOverride overrides the lead time that ranges with the +// LEAD_FOR_GLOBAL_READS closed timestamp policy use to publish close timestamps +// (see TargetForPolicy), if it is set to a non-zero value. Meant as an escape +// hatch. +var LeadForGlobalReadsOverride = settings.RegisterDurationSetting( + "kv.closed_timestamp.lead_for_global_reads_override", + "if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps", + 0, + settings.NonNegativeDuration, +) diff --git a/pkg/kv/kvserver/closedts/sidetransport/sender.go b/pkg/kv/kvserver/closedts/sidetransport/sender.go index 496ca8b13a36..e7c237236bbc 100644 --- a/pkg/kv/kvserver/closedts/sidetransport/sender.go +++ b/pkg/kv/kvserver/closedts/sidetransport/sender.go @@ -266,9 +266,18 @@ func (s *Sender) publish(ctx context.Context) hlc.ClockTimestamp { now := s.clock.NowAsClockTimestamp() maxClockOffset := s.clock.MaxOffset() lagTargetDuration := closedts.TargetDuration.Get(&s.st.SV) + leadTargetOverride := closedts.LeadForGlobalReadsOverride.Get(&s.st.SV) + sideTransportCloseInterval := closedts.SideTransportCloseInterval.Get(&s.st.SV) for i := range s.trackedMu.lastClosed { pol := roachpb.RangeClosedTimestampPolicy(i) - target := closedts.TargetForPolicy(now, maxClockOffset, lagTargetDuration, pol) + target := closedts.TargetForPolicy( + now, + maxClockOffset, + lagTargetDuration, + leadTargetOverride, + sideTransportCloseInterval, + pol, + ) s.trackedMu.lastClosed[pol] = target msg.ClosedTimestamps[pol] = ctpb.Update_GroupUpdate{ Policy: pol, diff --git a/pkg/kv/kvserver/closedts/sidetransport/sender_test.go b/pkg/kv/kvserver/closedts/sidetransport/sender_test.go index ddf0a63c3bcd..19e3f29a8496 100644 --- a/pkg/kv/kvserver/closedts/sidetransport/sender_test.go +++ b/pkg/kv/kvserver/closedts/sidetransport/sender_test.go @@ -89,10 +89,15 @@ func newMockReplica(id roachpb.RangeID, nodes ...roachpb.NodeID) *mockReplica { } func expGroupUpdates(s *Sender, now hlc.ClockTimestamp) []ctpb.Update_GroupUpdate { - maxClockOffset := s.clock.MaxOffset() - lagTargetDuration := closedts.TargetDuration.Get(&s.st.SV) targetForPolicy := func(pol roachpb.RangeClosedTimestampPolicy) hlc.Timestamp { - return closedts.TargetForPolicy(now, maxClockOffset, lagTargetDuration, pol) + return closedts.TargetForPolicy( + now, + s.clock.MaxOffset(), + closedts.TargetDuration.Get(&s.st.SV), + closedts.LeadForGlobalReadsOverride.Get(&s.st.SV), + closedts.SideTransportCloseInterval.Get(&s.st.SV), + pol, + ) } return []ctpb.Update_GroupUpdate{ {Policy: roachpb.LAG_BY_CLUSTER_SETTING, ClosedTimestamp: targetForPolicy(roachpb.LAG_BY_CLUSTER_SETTING)}, diff --git a/pkg/kv/kvserver/replica_closedts.go b/pkg/kv/kvserver/replica_closedts.go index 5bfc5619e782..29bdd7e2eb44 100644 --- a/pkg/kv/kvserver/replica_closedts.go +++ b/pkg/kv/kvserver/replica_closedts.go @@ -144,9 +144,12 @@ func (r *Replica) BumpSideTransportClosed( // this range. Note that we might not be able to ultimately close this timestamp // if there are requests in flight. func (r *Replica) closedTimestampTargetRLocked() hlc.Timestamp { - now := r.Clock().NowAsClockTimestamp() - maxClockOffset := r.Clock().MaxOffset() - lagTargetDuration := closedts.TargetDuration.Get(&r.ClusterSettings().SV) - policy := r.closedTimestampPolicyRLocked() - return closedts.TargetForPolicy(now, maxClockOffset, lagTargetDuration, policy) + return closedts.TargetForPolicy( + r.Clock().NowAsClockTimestamp(), + r.Clock().MaxOffset(), + closedts.TargetDuration.Get(&r.ClusterSettings().SV), + closedts.LeadForGlobalReadsOverride.Get(&r.ClusterSettings().SV), + closedts.SideTransportCloseInterval.Get(&r.ClusterSettings().SV), + r.closedTimestampPolicyRLocked(), + ) } diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index efe0a6a9c91a..2cd0efb6cb4d 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -126,7 +126,12 @@ func (t *testProposer) closedTimestampTarget() hlc.Timestamp { return hlc.Timestamp{} } return closedts.TargetForPolicy( - t.clock.NowAsClockTimestamp(), t.clock.MaxOffset(), time.Second, t.rangePolicy, + t.clock.NowAsClockTimestamp(), + t.clock.MaxOffset(), + 1*time.Second, + 0, + 200*time.Millisecond, + t.rangePolicy, ) } @@ -665,20 +670,22 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { defer log.Scope(t).Close(t) ctx := context.Background() + const maxOffset = 500 * time.Millisecond mc := hlc.NewManualClock((1613588135 * time.Second).Nanoseconds()) - clock := hlc.NewClock(mc.UnixNano, time.Nanosecond) + clock := hlc.NewClock(mc.UnixNano, maxOffset) st := cluster.MakeTestingClusterSettings() closedts.TargetDuration.Override(&st.SV, time.Second) - now := clock.Now() - newLeaseStart := now.MustToClockTimestamp() - nowMinusClosedLag := hlc.Timestamp{ - WallTime: mc.UnixNano() - closedts.TargetDuration.Get(&st.SV).Nanoseconds(), - } - nowMinusTwiceClosedLag := hlc.Timestamp{ - WallTime: mc.UnixNano() - 2*closedts.TargetDuration.Get(&st.SV).Nanoseconds(), - } - expiredLeaseTimestamp := hlc.Timestamp{WallTime: mc.UnixNano() - 1000} - someClosedTS := hlc.Timestamp{WallTime: mc.UnixNano() - 2000} + closedts.SideTransportCloseInterval.Override(&st.SV, 200*time.Millisecond) + now := clock.NowAsClockTimestamp() + nowTS := now.ToTimestamp() + nowMinusClosedLag := nowTS.Add(-closedts.TargetDuration.Get(&st.SV).Nanoseconds(), 0) + nowMinusTwiceClosedLag := nowTS.Add(-2*closedts.TargetDuration.Get(&st.SV).Nanoseconds(), 0) + nowPlusGlobalReadLead := nowTS.Add((maxOffset + + 275*time.Millisecond /* sideTransportPropTime */ + + 25*time.Millisecond /* bufferTime */).Nanoseconds(), 0). + WithSynthetic(true) + expiredLeaseTimestamp := nowTS.Add(-1000, 0) + someClosedTS := nowTS.Add(-2000, 0) type reqType int checkClosedTS := func(t *testing.T, r *testProposerRaft, exp hlc.Timestamp) { @@ -687,7 +694,9 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { require.Nil(t, r.lastProps[0].ClosedTimestamp) } else { require.NotNil(t, r.lastProps[0].ClosedTimestamp) - require.Equal(t, exp, *r.lastProps[0].ClosedTimestamp) + closedTS := *r.lastProps[0].ClosedTimestamp + closedTS.Logical = 0 // ignore logical ticks from clock + require.Equal(t, exp, closedTS) } } @@ -768,14 +777,14 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { lease: roachpb.Lease{ // Higher sequence => this is a brand new lease, not an extension. Sequence: curLease.Sequence + 1, - Start: newLeaseStart, + Start: now, }, trackerLowerBound: hlc.Timestamp{}, // The current lease can be expired; we won't backtrack the closed // timestamp to this expiration. leaseExp: expiredLeaseTimestamp, rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING, - expClosed: newLeaseStart.ToTimestamp(), + expClosed: now.ToTimestamp(), }, { name: "lease extension", @@ -783,7 +792,7 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { lease: roachpb.Lease{ // Same sequence => this is a lease extension. Sequence: curLease.Sequence, - Start: newLeaseStart, + Start: now, }, trackerLowerBound: hlc.Timestamp{}, // The current lease can be expired; we won't backtrack the closed @@ -801,7 +810,7 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { reqType: leaseTransfer, lease: roachpb.Lease{ Sequence: curLease.Sequence + 1, - Start: newLeaseStart, + Start: now, }, trackerLowerBound: hlc.Timestamp{}, leaseExp: hlc.MaxTimestamp, @@ -811,17 +820,13 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { { // With the LEAD_FOR_GLOBAL_READS policy, we're expecting to close // timestamps in the future. - // TODO(andrei,nvanbenschoten): The global policy is not actually hooked - // up at the moment, so this test expects a past timestamp to be closed. - // Once it is hooked up, we should also add another test that checks that - // timestamps above the current lease expiration are not closed. name: "global range", reqType: regularWrite, trackerLowerBound: hlc.Timestamp{}, leaseExp: hlc.MaxTimestamp, rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS, prevClosedTimestamp: hlc.Timestamp{}, - expClosed: nowMinusClosedLag, + expClosed: nowPlusGlobalReadLead, }, } { t.Run(tc.name, func(t *testing.T) { @@ -853,7 +858,7 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { var ba roachpb.BatchRequest ba.Add(&roachpb.TransferLeaseRequest{ Lease: roachpb.Lease{ - Start: now.MustToClockTimestamp(), + Start: now, Sequence: pc.lease.Lease.Sequence + 1, }, PrevLease: pc.lease.Lease, diff --git a/pkg/util/hlc/timestamp.go b/pkg/util/hlc/timestamp.go index 550cab61b671..58d273968e56 100644 --- a/pkg/util/hlc/timestamp.go +++ b/pkg/util/hlc/timestamp.go @@ -368,15 +368,6 @@ func (t Timestamp) UnsafeToClockTimestamp() ClockTimestamp { return ClockTimestamp(t) } -// MustToClockTimestamp casts a Timestamp to a ClockTimestamp. Panics if the -// timestamp is synthetic. See TryToClockTimestamp if you don't want to panic. -func (t Timestamp) MustToClockTimestamp() ClockTimestamp { - if t.Synthetic { - panic(fmt.Sprintf("can't convert synthetic timestamp to ClockTimestamp: %s", t)) - } - return ClockTimestamp(t) -} - // ToTimestamp upcasts a ClockTimestamp into a Timestamp. func (t ClockTimestamp) ToTimestamp() Timestamp { if t.Synthetic {