Skip to content

Commit

Permalink
kvserver: introduce a Raft-based transport for closedts
Browse files Browse the repository at this point in the history
This patch introduces a replacement for the existing closed timestamp
mechanism / transport. The new mechanism is gated by a cluster version.

Raft commands now carry increasing closed timestamps generated by the
propBuf by using the recent request Tracker for synchronizing with
in-flight requests (i.e. not closing timestamps below them).
Raft commands get a closed ts field, and the range state gets the field
as well.

The propBuf pays attention to the range's closed timestamp policy for
deciding whether to close lagging or leading timestamps.

Fixes #57395, #57396
Touches #57405

Release note: None
  • Loading branch information
andreimatei committed Feb 20, 2021
1 parent 98f1479 commit 0668efb
Show file tree
Hide file tree
Showing 33 changed files with 1,718 additions and 418 deletions.
7 changes: 7 additions & 0 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ const (
// database, such as adding REGIONS to a DATABASE or setting the LOCALITY
// on a TABLE.
MultiRegionFeatures
// ClosedTimestampsRaftTransport enables the Raft transport for closed
// timestamps and disables the previous per-node transport.
ClosedTimestampsRaftTransport

// Step (1): Add new versions here.
)
Expand Down Expand Up @@ -426,6 +429,10 @@ var versionsSingleton = keyedVersions([]keyedVersion{
Key: MultiRegionFeatures,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 34},
},
{
Key: ClosedTimestampsRaftTransport,
Version: roachpb.Version{Major: 20, Minor: 2, Internal: 36},
},
// Step (2): Add new versions here.
})

Expand Down
5 changes: 3 additions & 2 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ go_library(
"//pkg/kv/kvserver/closedts/container",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/closedts/storage",
"//pkg/kv/kvserver/closedts/tracker",
"//pkg/kv/kvserver/concurrency",
"//pkg/kv/kvserver/constraint",
"//pkg/kv/kvserver/gc",
Expand Down Expand Up @@ -299,6 +300,7 @@ go_test(
"//pkg/kv/kvserver/batcheval/result",
"//pkg/kv/kvserver/closedts",
"//pkg/kv/kvserver/closedts/ctpb",
"//pkg/kv/kvserver/closedts/tracker",
"//pkg/kv/kvserver/concurrency",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/constraint",
Expand Down
7 changes: 1 addition & 6 deletions pkg/kv/kvserver/below_raft_protos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,8 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
populatedConstructor: func(r *rand.Rand) protoutil.Message {
return enginepb.NewPopulatedRangeAppliedState(r, false)
},
// The populatedSum has changed from 10390885694280604642 to
// 7958815789228166749, as of 21.1, due to the addition of the
// SeparatedIntentCount field in MVCCStats. This field will not actually
// be populated until all nodes are on 21.1, so there isn't a risk of
// divergence.
emptySum: 615555020845646359,
populatedSum: 7958815789228166749,
populatedSum: 3253881774919630461,
},
reflect.TypeOf(&raftpb.HardState{}): {
populatedConstructor: func(r *rand.Rand) protoutil.Message {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3332,7 +3332,7 @@ func TestProposalOverhead(t *testing.T) {
// overhead is that users ranges do not have rangefeeds on by default whereas
// system ranges do.
const (
expectedUserOverhead uint32 = 42
expectedUserOverhead uint32 = 45
)
t.Run("user-key overhead", func(t *testing.T) {
userKey := tc.ScratchRange(t)
Expand Down
7 changes: 7 additions & 0 deletions pkg/kv/kvserver/closedts/tracker/heap_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,3 +124,10 @@ func (h *heapTracker) LowerBound(ctx context.Context) hlc.Timestamp {
}
return h.mu.rs[0].ts
}

// Count is part of the Tracker interface.
func (h *heapTracker) Count() int {
h.mu.Lock()
defer h.mu.Unlock()
return h.mu.rs.Len()
}
9 changes: 9 additions & 0 deletions pkg/kv/kvserver/closedts/tracker/lockfree_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

Expand Down Expand Up @@ -169,6 +170,9 @@ func (t *lockfreeTracker) Untrack(ctx context.Context, tok RemovalToken) {
b := tok.(lockfreeToken).b
// Note that atomic ops are not required here, as we hold the exclusive lock.
b.refcnt--
if b.refcnt < 0 {
log.Fatalf(ctx, "negative bucket refcount: %d", b.refcnt)
}
if b.refcnt == 0 {
// Reset the bucket, so that future Track() calls can create a new one.
b.ts = 0
Expand Down Expand Up @@ -198,6 +202,11 @@ func (t *lockfreeTracker) LowerBound(ctx context.Context) hlc.Timestamp {
}
}

// Count is part of the Tracker interface.
func (t *lockfreeTracker) Count() int {
return int(t.b1.refcnt) + int(t.b2.refcnt)
}

// bucket represent a Tracker bucket: a data structure that coalesces a number
// of timestamps, keeping track only of their count and minimum.
//
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv/kvserver/closedts/tracker/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ type Tracker interface {
// make is that, if no synthethic timestamp is inserted into the tracked set
// for a while, eventually the LowerBound value will not be synthetic.
LowerBound(context.Context) hlc.Timestamp

// Count returns the current size of the tracked set.
//
// Count cannot be called concurrently with other methods.
Count() int
}

// RemovalToken represents the result of Track: a token to be later used with
Expand Down
20 changes: 19 additions & 1 deletion pkg/kv/kvserver/kvserverpb/proposer_kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,36 @@

package kvserverpb

import "math"
import (
"math"

"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

var maxRaftCommandFooterSize = (&RaftCommandFooter{
MaxLeaseIndex: math.MaxUint64,
}).Size()

var maxClosedTimestampFooterSize = (&ClosedTimestampFooter{
ClosedTimestamp: hlc.Timestamp{
WallTime: math.MaxInt64,
Logical: math.MaxInt32,
Synthetic: true,
},
}).Size()

// MaxRaftCommandFooterSize returns the maximum possible size of an
// encoded RaftCommandFooter proto.
func MaxRaftCommandFooterSize() int {
return maxRaftCommandFooterSize
}

// MaxClosedTimestampFooterSize returns the maximmum possible size of an encoded
// ClosedTimestampFooter.
func MaxClosedTimestampFooterSize() int {
return maxClosedTimestampFooterSize
}

// IsZero returns whether all fields are set to their zero value.
func (r ReplicatedEvalResult) IsZero() bool {
return r == ReplicatedEvalResult{}
Expand Down
Loading

0 comments on commit 0668efb

Please sign in to comment.