diff --git a/pkg/kv/kvserver/replica_closedts.go b/pkg/kv/kvserver/replica_closedts.go index 06a460996c03..3d29397537b3 100644 --- a/pkg/kv/kvserver/replica_closedts.go +++ b/pkg/kv/kvserver/replica_closedts.go @@ -12,8 +12,12 @@ package kvserver import ( "context" + "time" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" ) // EmitMLAI registers the replica's last assigned max lease index with the @@ -57,3 +61,36 @@ func (r *Replica) EmitMLAI() { } } } + +// closedTimestampTargetRLocked computes the timestamp we'd like to close for +// this range. Note that we might not be able to ultimately close this timestamp +// if there are requests in flight. +func (r *Replica) closedTimestampTargetRLocked() hlc.Timestamp { + now := r.Clock().NowAsClockTimestamp() + policy := r.closedTimestampPolicyRLocked() + lagTargetDuration := closedts.TargetDuration.Get(&r.ClusterSettings().SV) + return closedTimestampTargetByPolicy(now, policy, lagTargetDuration) +} + +// closedTimestampTargetByPolicy returns the target closed timestamp for a range +// with the given policy. +func closedTimestampTargetByPolicy( + now hlc.ClockTimestamp, + policy roachpb.RangeClosedTimestampPolicy, + lagTargetDuration time.Duration, +) hlc.Timestamp { + var closedTSTarget hlc.Timestamp + switch policy { + case roachpb.LAG_BY_CLUSTER_SETTING, roachpb.LEAD_FOR_GLOBAL_READS: + closedTSTarget = hlc.Timestamp{WallTime: now.WallTime - lagTargetDuration.Nanoseconds()} + // TODO(andrei,nvanbenschoten): Resolve all the issues preventing us from closing + // timestamps in the future (which, in turn, forces future-time writes on + // global ranges), and enable the proper logic below. + //case roachpb.LEAD_FOR_GLOBAL_READS: + // closedTSTarget = hlc.Timestamp{ + // WallTime: now + 2*b.clock.MaxOffset().Nanoseconds(), + // Synthetic: true, + // } + } + return closedTSTarget +} diff --git a/pkg/kv/kvserver/replica_closedts_test.go b/pkg/kv/kvserver/replica_closedts_test.go new file mode 100644 index 000000000000..7ff1d17a1e44 --- /dev/null +++ b/pkg/kv/kvserver/replica_closedts_test.go @@ -0,0 +1,43 @@ +package kvserver + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestClosedTimestampTargetByPolicy(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const nowNanos = 100 + const maxOffsetNanos = 20 + manualClock := hlc.NewManualClock(nowNanos) + clock := hlc.NewClock(manualClock.UnixNano, maxOffsetNanos) + const lagTargetNanos = 10 + + for _, tc := range []struct { + rangePolicy roachpb.RangeClosedTimestampPolicy + expClosedTSTarget hlc.Timestamp + }{ + { + rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING, + expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos - lagTargetNanos}, + }, + { + rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS, + expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos - lagTargetNanos}, + // TODO(andrei, nvanbenschoten): What we should be expecting here is the following, once + // the propBuf starts properly implementing this timestamp closing policy: + // expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos + 2*maxOffsetNanos, Synthetic: true}, + }, + } { + t.Run(tc.rangePolicy.String(), func(t *testing.T) { + require.Equal(t, tc.expClosedTSTarget, closedTimestampTargetByPolicy(clock.NowAsClockTimestamp(), tc.rangePolicy, lagTargetNanos)) + }) + } +} diff --git a/pkg/kv/kvserver/replica_proposal_buf.go b/pkg/kv/kvserver/replica_proposal_buf.go index 0d99ac4225ea..37558e97222b 100644 --- a/pkg/kv/kvserver/replica_proposal_buf.go +++ b/pkg/kv/kvserver/replica_proposal_buf.go @@ -16,7 +16,6 @@ import ( "sync/atomic" "github.com/cockroachdb/cockroach/pkg/clusterversion" - "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -213,7 +212,7 @@ type proposer interface { destroyed() destroyStatus leaseAppliedIndex() uint64 enqueueUpdateCheck() - closeTimestampPolicy() roachpb.RangeClosedTimestampPolicy + closedTimestampTarget() hlc.Timestamp // raftTransportClosedTimestampEnabled returns whether the range has switched // to the Raft-based closed timestamp transport. // TODO(andrei): This shouldn't be needed any more in 21.2, once the Raft @@ -518,7 +517,7 @@ func (b *propBuf) FlushLockedWithRaftGroup( } } - closedTSTarget := b.computeClosedTimestampTarget() + closedTSTarget := b.p.closedTimestampTarget() // Remember the first error that we see when proposing the batch. We don't // immediately return this error because we want to finish clearing out the @@ -677,29 +676,6 @@ func (b *propBuf) FlushLockedWithRaftGroup( return used, proposeBatch(raftGroup, b.p.replicaID(), ents) } -// computeClosedTimestampTarget computes the timestamp we'd like to close for -// our range. Note that we might not be able to ultimately close this timestamp -// if there's requests in flight. -func (b *propBuf) computeClosedTimestampTarget() hlc.Timestamp { - now := b.clock.Now().WallTime - closedTSPolicy := b.p.closeTimestampPolicy() - var closedTSTarget hlc.Timestamp - switch closedTSPolicy { - case roachpb.LAG_BY_CLUSTER_SETTING, roachpb.LEAD_FOR_GLOBAL_READS: - targetDuration := closedts.TargetDuration.Get(&b.settings.SV) - closedTSTarget = hlc.Timestamp{WallTime: now - targetDuration.Nanoseconds()} - // TODO(andrei,nvanbenschoten): Resolve all the issues preventing us from closing - // timestamps in the future (which, in turn, forces future-time writes on - // global ranges), and enable the proper logic below. - //case roachpb.LEAD_FOR_GLOBAL_READS: - // closedTSTarget = hlc.Timestamp{ - // WallTime: now + 2*b.clock.MaxOffset().Nanoseconds(), - // Synthetic: true, - // } - } - return closedTSTarget -} - // assignClosedTimestampToProposalLocked assigns a closed timestamp to be carried by // an outgoing proposal. // @@ -1027,8 +1003,8 @@ func (rp *replicaProposer) enqueueUpdateCheck() { rp.store.enqueueRaftUpdateCheck(rp.RangeID) } -func (rp *replicaProposer) closeTimestampPolicy() roachpb.RangeClosedTimestampPolicy { - return (*Replica)(rp).closedTimestampPolicyRLocked() +func (rp *replicaProposer) closedTimestampTarget() hlc.Timestamp { + return (*Replica)(rp).closedTimestampTargetRLocked() } func (rp *replicaProposer) raftTransportClosedTimestampEnabled() bool { diff --git a/pkg/kv/kvserver/replica_proposal_buf_test.go b/pkg/kv/kvserver/replica_proposal_buf_test.go index 62de4020aeae..e6e092124115 100644 --- a/pkg/kv/kvserver/replica_proposal_buf_test.go +++ b/pkg/kv/kvserver/replica_proposal_buf_test.go @@ -60,6 +60,7 @@ type testProposer struct { // is. Some types of replicas are not eligible to get a lease. leaderReplicaType roachpb.ReplicaType rangePolicy roachpb.RangeClosedTimestampPolicy + clock *hlc.Clock } var _ proposer = &testProposer{} @@ -120,8 +121,11 @@ func (t *testProposer) enqueueUpdateCheck() { t.enqueued++ } -func (t *testProposer) closeTimestampPolicy() roachpb.RangeClosedTimestampPolicy { - return t.rangePolicy +func (t *testProposer) closedTimestampTarget() hlc.Timestamp { + if t.clock == nil { + return hlc.Timestamp{} + } + return closedTimestampTargetByPolicy(t.clock.NowAsClockTimestamp(), t.rangePolicy, time.Second) } func (t *testProposer) raftTransportClosedTimestampEnabled() bool { @@ -651,45 +655,6 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) { } } -func TestProposalBufferComputeClosedTimestampTarget(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - const nowNanos = 100 - const maxOffsetNanos = 20 - manualClock := hlc.NewManualClock(nowNanos) - clock := hlc.NewClock(manualClock.UnixNano, maxOffsetNanos) - - const lagTargetNanos = 10 - st := cluster.MakeTestingClusterSettings() - closedts.TargetDuration.Override(&st.SV, lagTargetNanos) - - for _, tc := range []struct { - rangePolicy roachpb.RangeClosedTimestampPolicy - expClosedTSTarget hlc.Timestamp - }{ - { - rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING, - expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos - lagTargetNanos}, - }, - { - rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS, - expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos - lagTargetNanos}, - // TODO(andrei, nvanbenschoten): What we should be expecting here is the following, once - // the propBuf starts properly implementing this timestamp closing policy: - // expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos + 2*maxOffsetNanos, Synthetic: true}, - }, - } { - t.Run(tc.rangePolicy.String(), func(t *testing.T) { - var p testProposer - p.rangePolicy = tc.rangePolicy - var b propBuf - b.Init(&p, tracker.NewLockfreeTracker(), clock, st) - require.Equal(t, tc.expClosedTSTarget, b.computeClosedTimestampTarget()) - }) - } -} - // Test that the propBuf properly assigns closed timestamps to proposals being // flushed out of it. Each subtest proposes one command and checks for the // expected closed timestamp being written to the proposal by the propBuf. @@ -860,8 +825,10 @@ func TestProposalBufferClosedTimestamp(t *testing.T) { t.Run(tc.name, func(t *testing.T) { r := &testProposerRaft{} p := testProposer{ - lai: 10, - raftGroup: r, + lai: 10, + raftGroup: r, + rangePolicy: tc.rangePolicy, + clock: clock, } tracker := mockTracker{ lowerBound: tc.trackerLowerBound,