Skip to content

Commit

Permalink
kv: configure leading closed timestamp target for global_read ranges
Browse files Browse the repository at this point in the history
Informs #59680.
Informs #52745.

This commit updates `closedts.TargetForPolicy` to calculate a target closed
timestamp that leads present time for ranges with the LEAD_FOR_GLOBAL_READS
closed timestamp policy. This is needed for non-blocking transactions, which
require ranges to closed time in the future.

TargetForPolicy's LEAD_FOR_GLOBAL_READS calculation is more complex than its
LAG_BY_CLUSTER_SETTING calculation. Instead of the policy defining an offset
from the publisher's perspective, the policy defines a goal from the consumer's
perspective - the goal being that present time reads (with a possible
uncertainty interval) can be served from all followers. To accomplish this, we
must work backwards to establish a lead time to publish closed timestamps at.

The calculation looks something like the following:
```
// this should be sufficient for any present-time transaction,
// because its global uncertainty limit should be <= this time.
// For more, see (*Transaction).RequiredFrontier.
closed_ts_at_follower = now + max_offset

// the sender must account for the time it takes to propagate a
// closed timestamp update to its followers.
closed_ts_at_sender = closed_ts_at_follower + propagation_time

// closed timestamps propagate in two ways. Both need to make it to
// followers in time.
propagation_time = max(raft_propagation_time, side_propagation_time)

// raft propagation takes 3 network hops to go from a leader proposing
// a write (with a closed timestamp update) to the write being applied.
// 1. leader sends MsgProp with entry
// 2. followers send MsgPropResp with vote
// 3. leader sends MsgProp with higher commit index
//
// we also add on a small bit of overhead for request evaluation, log
// sync, and state machine apply latency.
raft_propagation_time = max_network_rtt * 1.5 + raft_overhead

// side-transport propagation takes 1 network hop, as there is no voting.
// However, it is delayed by the full side_transport_close_interval in
// the worst-case.
side_propagation_time = max_network_rtt * 0.5 + side_transport_close_interval

// put together, we get the following result
closed_ts_at_sender = now + max_offset + max(
	max_network_rtt * 1.5 + raft_overhead,
	max_network_rtt * 0.5 + side_transport_close_interval,
)
```

While writing this, I explored what it would take to use dynamic network latency
measurements in this calculation to complete #59680. The code for that wasn't
too bad, but brought up a number of questions, including how far into the tail
we care about and whether we place upper and lower bounds on this value. To
avoid needing to immediately answer these questions, the commit hardcodes a
maximum network RTT of 150ms, which should be an overestimate for almost any
cluster we expect to run on.

The commit also adds a new `kv.closed_timestamp.lead_for_global_reads_override`
cluster setting, which, if nonzero, overrides the lead time that global_read
ranges use to publish closed timestamps. The cluster setting is hidden, but
should provide an escape hatch for cases where we get the calculation
(especially when it becomes dynamic) wrong.

Release justification: needed for new functionality
  • Loading branch information
nvanbenschoten committed Mar 4, 2021
1 parent 34dc5ae commit b320c03
Show file tree
Hide file tree
Showing 9 changed files with 220 additions and 65 deletions.
9 changes: 9 additions & 0 deletions pkg/ccl/multiregionccl/regional_by_row_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,15 @@ func TestAlterTableLocalityRegionalByRowError(t *testing.T) {
// TTL into the system with AddImmediateGCZoneConfig.
defer sqltestutils.DisableGCTTLStrictEnforcement(t, sqlDB)()

// Drop the closed timestamp target lead for GLOBAL tables.
// The test passes with it configured to its default, but it
// is very slow due to #61444 (2.5s vs. 35s).
// TODO(nvanbenschoten): We can remove this when that issue
// is addressed.
if _, err := sqlDB.Exec(`SET CLUSTER SETTING kv.closed_timestamp.lead_for_global_reads_override = '5ms'`); err != nil {
t.Fatal(err)
}

if _, err := sqlDB.Exec(fmt.Sprintf(`
CREATE DATABASE t PRIMARY REGION "ajstorm-1";
USE t;
Expand Down
106 changes: 96 additions & 10 deletions pkg/kv/kvserver/closedts/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,105 @@ func TargetForPolicy(
now hlc.ClockTimestamp,
maxClockOffset time.Duration,
lagTargetDuration time.Duration,
leadTargetOverride time.Duration,
sideTransportCloseInterval time.Duration,
policy roachpb.RangeClosedTimestampPolicy,
) hlc.Timestamp {
switch policy {
case roachpb.LAG_BY_CLUSTER_SETTING, roachpb.LEAD_FOR_GLOBAL_READS:
return hlc.Timestamp{WallTime: now.WallTime - lagTargetDuration.Nanoseconds()}
// TODO(andrei,nvanbenschoten): Resolve all the issues preventing us from closing
// timestamps in the future (which, in turn, forces future-time writes on
// global ranges), and enable the proper logic below.
//case roachpb.LEAD_FOR_GLOBAL_READS:
// closedTSTarget = hlc.Timestamp{
// WallTime: now + 2*maxClockOffset.Nanoseconds(),
// Synthetic: true,
// }
case roachpb.LAG_BY_CLUSTER_SETTING:
// Simple calculation: lag now by desired duration.
return now.ToTimestamp().Add(-lagTargetDuration.Nanoseconds(), 0)
case roachpb.LEAD_FOR_GLOBAL_READS:
// The LEAD_FOR_GLOBAL_READS calculation is more complex. Instead of the
// policy defining an offset from the publisher's perspective, the
// policy defines a goal from the consumer's perspective - the goal
// being that present time reads (with a possible uncertainty interval)
// can be served from all followers. To accomplish this, we must work
// backwards to establish a lead time to publish closed timestamps at.
//
// The calculation looks something like the following:
//
// # This should be sufficient for any present-time transaction,
// # because its global uncertainty limit should be <= this time.
// # For more, see (*Transaction).RequiredFrontier.
// closed_ts_at_follower = now + max_offset
//
// # The sender must account for the time it takes to propagate a
// # closed timestamp update to its followers.
// closed_ts_at_sender = closed_ts_at_follower + propagation_time
//
// # Closed timestamps propagate in two ways. Both need to make it to
// # followers in time.
// propagation_time = max(raft_propagation_time, side_propagation_time)
//
// # Raft propagation takes 3 network hops to go from a leader proposing
// # a write (with a closed timestamp update) to the write being applied.
// # 1. leader sends MsgProp with entry
// # 2. followers send MsgPropResp with vote
// # 3. leader sends MsgProp with higher commit index
// #
// # We also add on a small bit of overhead for request evaluation, log
// # sync, and state machine apply latency.
// raft_propagation_time = max_network_rtt * 1.5 + raft_overhead
//
// # Side-transport propagation takes 1 network hop, as there is no voting.
// # However, it is delayed by the full side_transport_close_interval in
// # the worst-case.
// side_propagation_time = max_network_rtt * 0.5 + side_transport_close_interval
//
// # Combine, we get the following result
// closed_ts_at_sender = now + max_offset + max(
// max_network_rtt * 1.5 + raft_overhead,
// max_network_rtt * 0.5 + side_transport_close_interval,
// )
//
// By default, this leads to a closed timestamp target that leads the
// senders current clock by 800ms.
//
// NOTE: this calculation takes into consideration maximum clock skew as
// it relates to a transaction's uncertainty interval, but it does not
// take into consideration "effective" clock skew as it relates to a
// follower replica having a faster clock than a leaseholder and
// therefore needing the leaseholder to publish even further into the
// future. Since the effect of getting this wrong is reduced performance
// (i.e. missed follower reads) and not a correctness violation (i.e.
// stale reads), we can be less strict here. We also expect that even
// when two nodes have skewed physical clocks, the "stability" property
// of HLC propagation when nodes are communicating should reduce the
// effective HLC clock skew.

// TODO(nvanbenschoten): make this dynamic, based on the measured
// network latencies recorded by the RPC context. This isn't trivial and
// brings up a number of questions. For instance, how far into the tail
// do we care about? Do we place upper and lower bounds on this value?
const maxNetworkRTT = 150 * time.Millisecond

// See raft_propagation_time.
const raftTransportOverhead = 20 * time.Millisecond
raftTransportPropTime := (maxNetworkRTT*3)/2 + raftTransportOverhead

// See side_propagation_time.
sideTransportPropTime := maxNetworkRTT/2 + sideTransportCloseInterval

// See propagation_time.
maxTransportPropTime := sideTransportPropTime
if maxTransportPropTime < raftTransportPropTime {
maxTransportPropTime = raftTransportPropTime
}

// Include a small amount of extra margin to smooth out temporary
// network blips or anything else that slows down closed timestamp
// propagation momentarily.
const bufferTime = 25 * time.Millisecond
leadTimeAtSender := maxTransportPropTime + maxClockOffset + bufferTime

// Override entirely with cluster setting, if necessary.
if leadTargetOverride != 0 {
leadTimeAtSender = leadTargetOverride
}

// Mark as synthetic, because this time is in the future.
return now.ToTimestamp().Add(leadTimeAtSender.Nanoseconds(), 0).WithSynthetic(true)
default:
panic("unexpected RangeClosedTimestampPolicy")
}
Expand Down
64 changes: 50 additions & 14 deletions pkg/kv/kvserver/closedts/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package closedts

import (
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -24,29 +25,64 @@ func TestTargetForPolicy(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const nowNanos = 100
const maxOffsetNanos = 20
const lagTargetNanos = 10
cast := func(i int, unit time.Duration) time.Duration { return time.Duration(i) * unit }
secs := func(i int) time.Duration { return cast(i, time.Second) }
millis := func(i int) time.Duration { return cast(i, time.Millisecond) }

now := hlc.Timestamp{WallTime: millis(100).Nanoseconds()}
maxClockOffset := millis(500)

for _, tc := range []struct {
rangePolicy roachpb.RangeClosedTimestampPolicy
expClosedTSTarget hlc.Timestamp
lagTargetNanos time.Duration
leadTargetOverride time.Duration
sideTransportCloseInterval time.Duration
rangePolicy roachpb.RangeClosedTimestampPolicy
expClosedTSTarget hlc.Timestamp
}{
{
lagTargetNanos: secs(3),
rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING,
expClosedTSTarget: now.Add(-secs(3).Nanoseconds(), 0),
},
{
lagTargetNanos: secs(1),
rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING,
expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos - lagTargetNanos},
expClosedTSTarget: now.Add(-secs(1).Nanoseconds(), 0),
},
{
sideTransportCloseInterval: millis(200),
rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS,
expClosedTSTarget: now.
Add((maxClockOffset +
millis(275) /* sideTransportPropTime */ +
millis(25) /* bufferTime */).Nanoseconds(), 0).
WithSynthetic(true),
},
{
sideTransportCloseInterval: millis(50),
rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS,
expClosedTSTarget: now.
Add((maxClockOffset +
millis(245) /* raftTransportPropTime */ +
millis(25) /* bufferTime */).Nanoseconds(), 0).
WithSynthetic(true),
},
{
rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS,
expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos - lagTargetNanos},
// TODO(andrei, nvanbenschoten): What we should be expecting here is the following, once
// the propBuf starts properly implementing this timestamp closing policy:
// expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos + 2*maxOffsetNanos, Synthetic: true},
leadTargetOverride: millis(1234),
sideTransportCloseInterval: millis(200),
rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS,
expClosedTSTarget: now.Add(millis(1234).Nanoseconds(), 0).WithSynthetic(true),
},
} {
t.Run(tc.rangePolicy.String(), func(t *testing.T) {
now := hlc.ClockTimestamp{WallTime: nowNanos}
target := TargetForPolicy(now, maxOffsetNanos, lagTargetNanos, tc.rangePolicy)
t.Run("", func(t *testing.T) {
target := TargetForPolicy(
now.UnsafeToClockTimestamp(),
maxClockOffset,
tc.lagTargetNanos,
tc.leadTargetOverride,
tc.sideTransportCloseInterval,
tc.rangePolicy,
)
require.Equal(t, tc.expClosedTSTarget, target)
})
}
Expand Down
11 changes: 11 additions & 0 deletions pkg/kv/kvserver/closedts/setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,14 @@ var SideTransportCloseInterval = settings.RegisterDurationSetting(
200*time.Millisecond,
settings.NonNegativeDuration,
)

// LeadForGlobalReadsOverride overrides the lead time that ranges with the
// LEAD_FOR_GLOBAL_READS closed timestamp policy use to publish close timestamps
// (see TargetForPolicy), if it is set to a non-zero value. Meant as an escape
// hatch.
var LeadForGlobalReadsOverride = settings.RegisterDurationSetting(
"kv.closed_timestamp.lead_for_global_reads_override",
"if nonzero, overrides the lead time that global_read ranges use to publish closed timestamps",
0,
settings.NonNegativeDuration,
)
11 changes: 10 additions & 1 deletion pkg/kv/kvserver/closedts/sidetransport/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,18 @@ func (s *Sender) publish(ctx context.Context) hlc.ClockTimestamp {
now := s.clock.NowAsClockTimestamp()
maxClockOffset := s.clock.MaxOffset()
lagTargetDuration := closedts.TargetDuration.Get(&s.st.SV)
leadTargetOverride := closedts.LeadForGlobalReadsOverride.Get(&s.st.SV)
sideTransportCloseInterval := closedts.SideTransportCloseInterval.Get(&s.st.SV)
for i := range s.trackedMu.lastClosed {
pol := roachpb.RangeClosedTimestampPolicy(i)
target := closedts.TargetForPolicy(now, maxClockOffset, lagTargetDuration, pol)
target := closedts.TargetForPolicy(
now,
maxClockOffset,
lagTargetDuration,
leadTargetOverride,
sideTransportCloseInterval,
pol,
)
s.trackedMu.lastClosed[pol] = target
msg.ClosedTimestamps[pol] = ctpb.Update_GroupUpdate{
Policy: pol,
Expand Down
11 changes: 8 additions & 3 deletions pkg/kv/kvserver/closedts/sidetransport/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,15 @@ func newMockReplica(id roachpb.RangeID, nodes ...roachpb.NodeID) *mockReplica {
}

func expGroupUpdates(s *Sender, now hlc.ClockTimestamp) []ctpb.Update_GroupUpdate {
maxClockOffset := s.clock.MaxOffset()
lagTargetDuration := closedts.TargetDuration.Get(&s.st.SV)
targetForPolicy := func(pol roachpb.RangeClosedTimestampPolicy) hlc.Timestamp {
return closedts.TargetForPolicy(now, maxClockOffset, lagTargetDuration, pol)
return closedts.TargetForPolicy(
now,
s.clock.MaxOffset(),
closedts.TargetDuration.Get(&s.st.SV),
closedts.LeadForGlobalReadsOverride.Get(&s.st.SV),
closedts.SideTransportCloseInterval.Get(&s.st.SV),
pol,
)
}
return []ctpb.Update_GroupUpdate{
{Policy: roachpb.LAG_BY_CLUSTER_SETTING, ClosedTimestamp: targetForPolicy(roachpb.LAG_BY_CLUSTER_SETTING)},
Expand Down
13 changes: 8 additions & 5 deletions pkg/kv/kvserver/replica_closedts.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,12 @@ func (r *Replica) BumpSideTransportClosed(
// this range. Note that we might not be able to ultimately close this timestamp
// if there are requests in flight.
func (r *Replica) closedTimestampTargetRLocked() hlc.Timestamp {
now := r.Clock().NowAsClockTimestamp()
maxClockOffset := r.Clock().MaxOffset()
lagTargetDuration := closedts.TargetDuration.Get(&r.ClusterSettings().SV)
policy := r.closedTimestampPolicyRLocked()
return closedts.TargetForPolicy(now, maxClockOffset, lagTargetDuration, policy)
return closedts.TargetForPolicy(
r.Clock().NowAsClockTimestamp(),
r.Clock().MaxOffset(),
closedts.TargetDuration.Get(&r.ClusterSettings().SV),
closedts.LeadForGlobalReadsOverride.Get(&r.ClusterSettings().SV),
closedts.SideTransportCloseInterval.Get(&r.ClusterSettings().SV),
r.closedTimestampPolicyRLocked(),
)
}
Loading

0 comments on commit b320c03

Please sign in to comment.