Skip to content

Commit

Permalink
rangedesciter: support scoped iteration
Browse files Browse the repository at this point in the history
Informs #87503; adds the ability to only paginate through range
descriptors in the system that overlap with some given span. We're going
to use it in future commits as part of multi-tenant replication reports
(#89987) where we'll need to iterate over the set of range descriptors
that pertain only to some span of interest (for a given index,
partition, tenant, etc.)

Release note: None
  • Loading branch information
irfansharif committed Nov 23, 2022
1 parent 1d04cec commit 98ab461
Show file tree
Hide file tree
Showing 16 changed files with 530 additions and 56 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func TestTruncate(t *testing.T) {
desc.EndKey = roachpb.RKey(test.to)
}
rs := roachpb.RSpan{Key: roachpb.RKey(test.from), EndKey: roachpb.RKey(test.to)}
rs, err := rs.Intersect(desc)
rs, err := rs.Intersect(desc.RSpan())
if err != nil {
t.Errorf("%d: intersection failure: %v", i, err)
continue
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,7 +1382,7 @@ func (ds *DistSender) divideAndSendBatchToRanges(
responseChs = append(responseChs, responseCh)

// Truncate the request to range descriptor.
curRangeRS, err := rs.Intersect(ri.Token().Desc())
curRangeRS, err := rs.Intersect(ri.Token().Desc().RSpan())
if err != nil {
responseCh <- response{pErr: roachpb.NewError(err)}
return
Expand Down Expand Up @@ -1632,7 +1632,7 @@ func (ds *DistSender) sendPartialBatch(
// batch, so that we know that the response to it matches the positions
// into our batch (using the full batch here would give a potentially
// larger response slice with unknown mapping to our truncated reply).
intersection, err := rs.Intersect(routingTok.Desc())
intersection, err := rs.Intersect(routingTok.Desc().RSpan())
if err != nil {
return response{pErr: roachpb.NewError(err)}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func (ds *DistSender) divideAndSendRangeFeedToRanges(
ri := MakeRangeIterator(ds)
for ri.Seek(ctx, nextRS.Key, Ascending); ri.Valid(); ri.Next(ctx) {
desc := ri.Desc()
partialRS, err := nextRS.Intersect(desc)
partialRS, err := nextRS.Intersect(desc.RSpan())
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvstreamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,7 +551,7 @@ func (s *Streamer) Enqueue(ctx context.Context, reqs []roachpb.RequestUnion) (re
rs.Key = roachpb.RKeyMax
} else {
// Truncate the request span to the current range.
singleRangeSpan, err := rs.Intersect(ri.Token().Desc())
singleRangeSpan, err := rs.Intersect(ri.Token().Desc().RSpan())
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvclient/rangecache/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1424,7 +1424,7 @@ func (e *CacheEntry) LeaseSpeculative() bool {
// "speculative" (sequence=0).
func (e *CacheEntry) overrides(o *CacheEntry) bool {
if util.RaceEnabled {
if _, err := e.Desc().RSpan().Intersect(o.Desc()); err != nil {
if _, err := e.Desc().RSpan().Intersect(o.Desc().RSpan()); err != nil {
panic(fmt.Sprintf("descriptors don't intersect: %s vs %s", e.Desc(), o.Desc()))
}
}
Expand Down Expand Up @@ -1464,7 +1464,7 @@ func (e *CacheEntry) overrides(o *CacheEntry) bool {
// older; this matches the semantics of b.overrides(a).
func compareEntryDescs(a, b *CacheEntry) int {
if util.RaceEnabled {
if _, err := a.Desc().RSpan().Intersect(b.Desc()); err != nil {
if _, err := a.Desc().RSpan().Intersect(b.Desc().RSpan()); err != nil {
panic(fmt.Sprintf("descriptors don't intersect: %s vs %s", a.Desc(), b.Desc()))
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/rangefeed/db_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (dbc *dbAdapter) divideAndSendScanRequests(

for ri.Seek(ctx, nextRS.Key, kvcoord.Ascending); ri.Valid(); ri.Next(ctx) {
desc := ri.Desc()
partialRS, err := nextRS.Intersect(desc)
partialRS, err := nextRS.Intersect(desc.RSpan())
if err != nil {
return err
}
Expand Down
18 changes: 9 additions & 9 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2395,20 +2395,20 @@ func (rs RSpan) String() string {
}

// Intersect returns the intersection of the current span and the
// descriptor's range. Returns an error if the span and the
// descriptor's range do not overlap.
func (rs RSpan) Intersect(desc *RangeDescriptor) (RSpan, error) {
if !rs.Key.Less(desc.EndKey) || !desc.StartKey.Less(rs.EndKey) {
return rs, errors.Errorf("span and descriptor's range do not overlap: %s vs %s", rs, desc)
// given range. Returns an error if the span and the range do not
// overlap.
func (rs RSpan) Intersect(rspan RSpan) (RSpan, error) {
if !rs.Key.Less(rspan.EndKey) || !rspan.Key.Less(rs.EndKey) {
return rs, errors.Errorf("spans do not overlap: %s vs %s", rs, rspan)
}

key := rs.Key
if key.Less(desc.StartKey) {
key = desc.StartKey
if key.Less(rspan.Key) {
key = rspan.Key
}
endKey := rs.EndKey
if !desc.ContainsKeyRange(desc.StartKey, endKey) {
endKey = desc.EndKey
if !rspan.ContainsKeyRange(rspan.Key, endKey) {
endKey = rspan.EndKey
}
return RSpan{key, endKey}, nil
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/roachpb/data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1482,7 +1482,7 @@ func TestRSpanIntersect(t *testing.T) {
desc.StartKey = test.startKey
desc.EndKey = test.endKey

actual, err := rs.Intersect(&desc)
actual, err := rs.Intersect(desc.RSpan())
if err != nil {
t.Error(err)
continue
Expand All @@ -1504,7 +1504,7 @@ func TestRSpanIntersect(t *testing.T) {
desc := RangeDescriptor{}
desc.StartKey = test.startKey
desc.EndKey = test.endKey
if _, err := rs.Intersect(&desc); err == nil {
if _, err := rs.Intersect(desc.RSpan()); err == nil {
t.Errorf("%d: unexpected success", i)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/upgrade/upgradecluster/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/upgrade/upgradecluster",
visibility = ["//visibility:public"],
deps = [
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
Expand Down
3 changes: 2 additions & 1 deletion pkg/upgrade/upgradecluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ package upgradecluster
import (
"context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand Down Expand Up @@ -145,5 +146,5 @@ func (c *Cluster) ForEveryNode(
func (c *Cluster) IterateRangeDescriptors(
ctx context.Context, blockSize int, init func(), fn func(...roachpb.RangeDescriptor) error,
) error {
return c.c.RangeDescIterator.Iterate(ctx, blockSize, init, fn)
return c.c.RangeDescIterator.Iterate(ctx, blockSize, init, keys.EverythingSpan, fn)
}
5 changes: 5 additions & 0 deletions pkg/util/rangedesciter/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/util/iterutil",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand All @@ -21,6 +22,7 @@ go_test(
"rangedesciter_test.go",
],
args = ["-test.timeout=295s"],
data = glob(["testdata/**"]),
deps = [
":rangedesciter",
"//pkg/keys",
Expand All @@ -30,9 +32,12 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_stretchr_testify//require",
],
)

Expand Down
95 changes: 75 additions & 20 deletions pkg/util/rangedesciter/rangedesciter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,17 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/errors"
)

// Iterator paginates through every range descriptor in the system.
// Iterator paginates through range descriptors in the system.
type Iterator interface {
// Iterate paginates through range descriptors in the system using the given
// page size. It's important to note that the closure is being executed in
// the context of a distributed transaction that may be automatically
// retried. So something like the following is an anti-pattern:
// Iterate paginates through range descriptors in the system that overlap
// with the given span. When doing so it uses the given page size. It's
// important to note that the closure is being executed in the context of a
// distributed transaction that may be automatically retried. So something
// like the following is an anti-pattern:
//
// processed := 0
// _ = rdi.Iterate(...,
Expand All @@ -45,8 +47,15 @@ type Iterator interface {
// log.Infof(ctx, "processed %d ranges", processed)
// },
// )
//
//
// When the query span is something other than keys.EverythingSpan, the page
// size is also approximately haw many extra keys/range descriptors we may
// be reading. Callers are expected to pick a page size accordingly
// (page sizes that are much larger than expected # of descriptors would
// lead to wasted work).
Iterate(
ctx context.Context, pageSize int, init func(),
ctx context.Context, pageSize int, init func(), span roachpb.Span,
fn func(descriptors ...roachpb.RangeDescriptor) error,
) error
}
Expand Down Expand Up @@ -74,31 +83,58 @@ func (i *iteratorImpl) Iterate(
ctx context.Context,
pageSize int,
init func(),
span roachpb.Span,
fn func(descriptors ...roachpb.RangeDescriptor) error,
) error {
rspan := roachpb.RSpan{
Key: keys.MustAddr(span.Key),
EndKey: keys.MustAddr(span.EndKey),
}

return i.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Inform the caller that we're starting a fresh attempt to page in
// range descriptors.
init()

// Iterate through meta{1,2} to pull out all the range descriptors.
// Bound the start key of the meta{1,2} scan as much as possible. If the
// start key < keys.Meta1KeyMax (we're also interested in the meta1
// range descriptor), start our scan at keys.MetaMin. If not, start it
// at the relevant range meta key -- in meta1 if the start key sits
// within meta2, in meta2 if the start key is an ordinary key.
//
// So what exactly is the "relevant range meta key"? Since keys in meta
// ranges are encoded using the end keys of range descriptors, we're
// looking for the lowest existing range meta key that's strictly
// greater than RangeMetaKey(start key).
rangeMetaKeyForStart := keys.RangeMetaKey(rspan.Key)
metaScanBoundsForStart, err := keys.MetaScanBounds(rangeMetaKeyForStart)
if err != nil {
return err
}
metaScanStartKey := metaScanBoundsForStart.Key.AsRawKey()

// Iterate through meta{1,2} to pull out relevant range descriptors.
// We'll keep scanning until we've found a range descriptor outside the
// scan of interest.
var lastRangeIDInMeta1 roachpb.RangeID
return txn.Iterate(ctx, keys.MetaMin, keys.MetaMax, pageSize,
return iterutil.Map(txn.Iterate(ctx, metaScanStartKey, keys.MetaMax, pageSize,
func(rows []kv.KeyValue) error {
descriptors := make([]roachpb.RangeDescriptor, 0, len(rows))
stopMetaIteration := false

var desc roachpb.RangeDescriptor
for _, row := range rows {
err := row.ValueProto(&desc)
if err != nil {
if err := row.ValueProto(&desc); err != nil {
return errors.Wrapf(err, "unable to unmarshal range descriptor from %s", row.Key)
}

// In small enough clusters it's possible for the same range
// descriptor to be stored in both meta1 and meta2. This
// happens when some range spans both the meta and the user
// keyspace. Consider when r1 is [/Min,
// /System/NodeLiveness); we'll store the range descriptor
// in both /Meta2/<r1.EndKey> and in /Meta1/KeyMax[1].
// In small enough clusters, it's possible for the same
// range descriptor to be stored in both meta1 and meta2.
// This happens when some range spans both the meta and the
// user keyspace. Consider when r1 is
// [/Min, /System/NodeLiveness); we'll store the range
// descriptor in both /Meta2/<r1.EndKey> and in
// /Meta1/KeyMax[1].
//
// As part of iterator we'll de-duplicate this descriptor
// away by checking whether we've seen it before in meta1.
Expand All @@ -111,15 +147,34 @@ func (i *iteratorImpl) Iterate(
continue
}

if _, err := desc.KeySpan().Intersect(rspan); err != nil {
// We're past the last range descriptor that overlaps
// with the given span.
stopMetaIteration = true
break
}

// This descriptor's span intersects with our query span, so
// collect it for the callback.
descriptors = append(descriptors, desc)

if keys.InMeta1(keys.RangeMetaKey(desc.StartKey)) {
lastRangeIDInMeta1 = desc.RangeID
}
}

// Invoke fn with the current chunk (of size ~blockSize) of
// range descriptors.
return fn(descriptors...)
})
if len(descriptors) != 0 {
// Invoke fn with the current chunk (of size ~pageSize) of
// range descriptors.
if err := fn(descriptors...); err != nil {
return err
}
}
if stopMetaIteration {
return iterutil.StopIteration() // we're done here
}
return nil
}),
)
})
}
Loading

0 comments on commit 98ab461

Please sign in to comment.