Skip to content

Commit

Permalink
kvserver: refactor closed ts target
Browse files Browse the repository at this point in the history
Move a function out of the proposal buffer, so it can be shared between
the proposal buffer and the side transport.

Release note: None
  • Loading branch information
andreimatei committed Feb 25, 2021
1 parent 3ddfe0d commit 8392017
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 71 deletions.
37 changes: 37 additions & 0 deletions pkg/kv/kvserver/replica_closedts.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@ package kvserver

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/ctpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

// EmitMLAI registers the replica's last assigned max lease index with the
Expand Down Expand Up @@ -57,3 +61,36 @@ func (r *Replica) EmitMLAI() {
}
}
}

// closedTimestampTargetRLocked computes the timestamp we'd like to close for
// this range. Note that we might not be able to ultimately close this timestamp
// if there are requests in flight.
func (r *Replica) closedTimestampTargetRLocked() hlc.Timestamp {
now := r.Clock().NowAsClockTimestamp()
policy := r.closedTimestampPolicyRLocked()
lagTargetDuration := closedts.TargetDuration.Get(&r.ClusterSettings().SV)
return closedTimestampTargetByPolicy(now, policy, lagTargetDuration)
}

// closedTimestampTargetByPolicy returns the target closed timestamp for a range
// with the given policy.
func closedTimestampTargetByPolicy(
now hlc.ClockTimestamp,
policy roachpb.RangeClosedTimestampPolicy,
lagTargetDuration time.Duration,
) hlc.Timestamp {
var closedTSTarget hlc.Timestamp
switch policy {
case roachpb.LAG_BY_CLUSTER_SETTING, roachpb.LEAD_FOR_GLOBAL_READS:
closedTSTarget = hlc.Timestamp{WallTime: now.WallTime - lagTargetDuration.Nanoseconds()}
// TODO(andrei,nvanbenschoten): Resolve all the issues preventing us from closing
// timestamps in the future (which, in turn, forces future-time writes on
// global ranges), and enable the proper logic below.
//case roachpb.LEAD_FOR_GLOBAL_READS:
// closedTSTarget = hlc.Timestamp{
// WallTime: now + 2*b.clock.MaxOffset().Nanoseconds(),
// Synthetic: true,
// }
}
return closedTSTarget
}
43 changes: 43 additions & 0 deletions pkg/kv/kvserver/replica_closedts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package kvserver

import (
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

func TestClosedTimestampTargetByPolicy(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const nowNanos = 100
const maxOffsetNanos = 20
manualClock := hlc.NewManualClock(nowNanos)
clock := hlc.NewClock(manualClock.UnixNano, maxOffsetNanos)
const lagTargetNanos = 10

for _, tc := range []struct {
rangePolicy roachpb.RangeClosedTimestampPolicy
expClosedTSTarget hlc.Timestamp
}{
{
rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING,
expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos - lagTargetNanos},
},
{
rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS,
expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos - lagTargetNanos},
// TODO(andrei, nvanbenschoten): What we should be expecting here is the following, once
// the propBuf starts properly implementing this timestamp closing policy:
// expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos + 2*maxOffsetNanos, Synthetic: true},
},
} {
t.Run(tc.rangePolicy.String(), func(t *testing.T) {
require.Equal(t, tc.expClosedTSTarget, closedTimestampTargetByPolicy(clock.NowAsClockTimestamp(), tc.rangePolicy, lagTargetNanos))
})
}
}
32 changes: 4 additions & 28 deletions pkg/kv/kvserver/replica_proposal_buf.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/tracker"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -213,7 +212,7 @@ type proposer interface {
destroyed() destroyStatus
leaseAppliedIndex() uint64
enqueueUpdateCheck()
closeTimestampPolicy() roachpb.RangeClosedTimestampPolicy
closedTimestampTarget() hlc.Timestamp
// raftTransportClosedTimestampEnabled returns whether the range has switched
// to the Raft-based closed timestamp transport.
// TODO(andrei): This shouldn't be needed any more in 21.2, once the Raft
Expand Down Expand Up @@ -518,7 +517,7 @@ func (b *propBuf) FlushLockedWithRaftGroup(
}
}

closedTSTarget := b.computeClosedTimestampTarget()
closedTSTarget := b.p.closedTimestampTarget()

// Remember the first error that we see when proposing the batch. We don't
// immediately return this error because we want to finish clearing out the
Expand Down Expand Up @@ -677,29 +676,6 @@ func (b *propBuf) FlushLockedWithRaftGroup(
return used, proposeBatch(raftGroup, b.p.replicaID(), ents)
}

// computeClosedTimestampTarget computes the timestamp we'd like to close for
// our range. Note that we might not be able to ultimately close this timestamp
// if there's requests in flight.
func (b *propBuf) computeClosedTimestampTarget() hlc.Timestamp {
now := b.clock.Now().WallTime
closedTSPolicy := b.p.closeTimestampPolicy()
var closedTSTarget hlc.Timestamp
switch closedTSPolicy {
case roachpb.LAG_BY_CLUSTER_SETTING, roachpb.LEAD_FOR_GLOBAL_READS:
targetDuration := closedts.TargetDuration.Get(&b.settings.SV)
closedTSTarget = hlc.Timestamp{WallTime: now - targetDuration.Nanoseconds()}
// TODO(andrei,nvanbenschoten): Resolve all the issues preventing us from closing
// timestamps in the future (which, in turn, forces future-time writes on
// global ranges), and enable the proper logic below.
//case roachpb.LEAD_FOR_GLOBAL_READS:
// closedTSTarget = hlc.Timestamp{
// WallTime: now + 2*b.clock.MaxOffset().Nanoseconds(),
// Synthetic: true,
// }
}
return closedTSTarget
}

// assignClosedTimestampToProposalLocked assigns a closed timestamp to be carried by
// an outgoing proposal.
//
Expand Down Expand Up @@ -1027,8 +1003,8 @@ func (rp *replicaProposer) enqueueUpdateCheck() {
rp.store.enqueueRaftUpdateCheck(rp.RangeID)
}

func (rp *replicaProposer) closeTimestampPolicy() roachpb.RangeClosedTimestampPolicy {
return (*Replica)(rp).closedTimestampPolicyRLocked()
func (rp *replicaProposer) closedTimestampTarget() hlc.Timestamp {
return (*Replica)(rp).closedTimestampTargetRLocked()
}

func (rp *replicaProposer) raftTransportClosedTimestampEnabled() bool {
Expand Down
53 changes: 10 additions & 43 deletions pkg/kv/kvserver/replica_proposal_buf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type testProposer struct {
// is. Some types of replicas are not eligible to get a lease.
leaderReplicaType roachpb.ReplicaType
rangePolicy roachpb.RangeClosedTimestampPolicy
clock *hlc.Clock
}

var _ proposer = &testProposer{}
Expand Down Expand Up @@ -120,8 +121,11 @@ func (t *testProposer) enqueueUpdateCheck() {
t.enqueued++
}

func (t *testProposer) closeTimestampPolicy() roachpb.RangeClosedTimestampPolicy {
return t.rangePolicy
func (t *testProposer) closedTimestampTarget() hlc.Timestamp {
if t.clock == nil {
return hlc.Timestamp{}
}
return closedTimestampTargetByPolicy(t.clock.NowAsClockTimestamp(), t.rangePolicy, time.Second)
}

func (t *testProposer) raftTransportClosedTimestampEnabled() bool {
Expand Down Expand Up @@ -651,45 +655,6 @@ func TestProposalBufferRejectLeaseAcqOnFollower(t *testing.T) {
}
}

func TestProposalBufferComputeClosedTimestampTarget(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const nowNanos = 100
const maxOffsetNanos = 20
manualClock := hlc.NewManualClock(nowNanos)
clock := hlc.NewClock(manualClock.UnixNano, maxOffsetNanos)

const lagTargetNanos = 10
st := cluster.MakeTestingClusterSettings()
closedts.TargetDuration.Override(&st.SV, lagTargetNanos)

for _, tc := range []struct {
rangePolicy roachpb.RangeClosedTimestampPolicy
expClosedTSTarget hlc.Timestamp
}{
{
rangePolicy: roachpb.LAG_BY_CLUSTER_SETTING,
expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos - lagTargetNanos},
},
{
rangePolicy: roachpb.LEAD_FOR_GLOBAL_READS,
expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos - lagTargetNanos},
// TODO(andrei, nvanbenschoten): What we should be expecting here is the following, once
// the propBuf starts properly implementing this timestamp closing policy:
// expClosedTSTarget: hlc.Timestamp{WallTime: nowNanos + 2*maxOffsetNanos, Synthetic: true},
},
} {
t.Run(tc.rangePolicy.String(), func(t *testing.T) {
var p testProposer
p.rangePolicy = tc.rangePolicy
var b propBuf
b.Init(&p, tracker.NewLockfreeTracker(), clock, st)
require.Equal(t, tc.expClosedTSTarget, b.computeClosedTimestampTarget())
})
}
}

// Test that the propBuf properly assigns closed timestamps to proposals being
// flushed out of it. Each subtest proposes one command and checks for the
// expected closed timestamp being written to the proposal by the propBuf.
Expand Down Expand Up @@ -860,8 +825,10 @@ func TestProposalBufferClosedTimestamp(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
r := &testProposerRaft{}
p := testProposer{
lai: 10,
raftGroup: r,
lai: 10,
raftGroup: r,
rangePolicy: tc.rangePolicy,
clock: clock,
}
tracker := mockTracker{
lowerBound: tc.trackerLowerBound,
Expand Down

0 comments on commit 8392017

Please sign in to comment.