Skip to content

Commit

Permalink
keys: create SpanAddr utility
Browse files Browse the repository at this point in the history
This was badly needed in a number of places. I found myself reaching for
it again, so I figured it was time to add it.
  • Loading branch information
nvanbenschoten committed Jul 29, 2020
1 parent f4cdaab commit 2632df2
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 50 deletions.
18 changes: 18 additions & 0 deletions pkg/keys/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,24 @@ func AddrUpperBound(k roachpb.Key) (roachpb.RKey, error) {
return rk, nil
}

// SpanAddr is like Addr, but it takes a Span instead of a single key and
// applies the key transformation to the start and end keys in the span,
// returning an RSpan.
func SpanAddr(span roachpb.Span) (roachpb.RSpan, error) {
rk, err := Addr(span.Key)
if err != nil {
return roachpb.RSpan{}, err
}
var rek roachpb.RKey
if len(span.EndKey) > 0 {
rek, err = Addr(span.EndKey)
if err != nil {
return roachpb.RSpan{}, err
}
}
return roachpb.RSpan{Key: rk, EndKey: rek}, nil
}

// RangeMetaKey returns a range metadata (meta1, meta2) indexing key for the
// given key.
//
Expand Down
64 changes: 64 additions & 0 deletions pkg/keys/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,70 @@ func TestKeyAddressError(t *testing.T) {
}
}

func TestSpanAddress(t *testing.T) {
testCases := []struct {
span roachpb.Span
expAddress roachpb.RSpan
}{
// Without EndKey.
{
roachpb.Span{},
roachpb.RSpan{},
},
{
roachpb.Span{Key: roachpb.Key{}},
roachpb.RSpan{Key: roachpb.RKeyMin},
},
{
roachpb.Span{Key: roachpb.Key("123")},
roachpb.RSpan{Key: roachpb.RKey("123")},
},
{
roachpb.Span{Key: RangeDescriptorKey(roachpb.RKey("foo"))},
roachpb.RSpan{Key: roachpb.RKey("foo")},
},
{
roachpb.Span{Key: TransactionKey(roachpb.Key("baz"), uuid.MakeV4())},
roachpb.RSpan{Key: roachpb.RKey("baz")},
},
{
roachpb.Span{Key: TransactionKey(roachpb.KeyMax, uuid.MakeV4())},
roachpb.RSpan{Key: roachpb.RKeyMax},
},
{
roachpb.Span{Key: RangeDescriptorKey(roachpb.RKey(TransactionKey(roachpb.Key("doubleBaz"), uuid.MakeV4())))},
roachpb.RSpan{Key: roachpb.RKey("doubleBaz")},
},
// With EndKey.
{
roachpb.Span{Key: roachpb.Key("123"), EndKey: roachpb.Key("456")},
roachpb.RSpan{Key: roachpb.RKey("123"), EndKey: roachpb.RKey("456")},
},
{
roachpb.Span{Key: RangeDescriptorKey(roachpb.RKey("foo")), EndKey: RangeDescriptorKey(roachpb.RKey("fop"))},
roachpb.RSpan{Key: roachpb.RKey("foo"), EndKey: roachpb.RKey("fop")},
},
{
roachpb.Span{Key: TransactionKey(roachpb.Key("bar"), uuid.MakeV4()), EndKey: TransactionKey(roachpb.Key("baz"), uuid.MakeV4())},
roachpb.RSpan{Key: roachpb.RKey("bar"), EndKey: roachpb.RKey("baz")},
},
{
roachpb.Span{
Key: RangeDescriptorKey(roachpb.RKey(TransactionKey(roachpb.Key("doubleBar"), uuid.MakeV4()))),
EndKey: RangeDescriptorKey(roachpb.RKey(TransactionKey(roachpb.Key("doubleBaz"), uuid.MakeV4()))),
},
roachpb.RSpan{Key: roachpb.RKey("doubleBar"), EndKey: roachpb.RKey("doubleBaz")},
},
}
for i, test := range testCases {
if spanAddr, err := SpanAddr(test.span); err != nil {
t.Errorf("%d: %v", i, err)
} else if !spanAddr.Equal(test.expAddress) {
t.Errorf("%d: expected address for span %q doesn't match %q", i, test.span, test.expAddress)
}
}
}

func TestRangeMetaKey(t *testing.T) {
testCases := []struct {
key, expKey roachpb.RKey
Expand Down
7 changes: 1 addition & 6 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,10 @@ func (ds *DistSender) RangeFeed(
ctx, sp := tracing.EnsureChildSpan(ctx, ds.AmbientContext.Tracer, "dist sender")
defer sp.Finish()

startRKey, err := keys.Addr(span.Key)
rs, err := keys.SpanAddr(span)
if err != nil {
return err
}
endRKey, err := keys.Addr(span.EndKey)
if err != nil {
return err
}
rs := roachpb.RSpan{Key: startRKey, EndKey: endRKey}

g := ctxgroup.WithContext(ctx)
// Goroutine that processes subdivided ranges and creates a rangefeed for
Expand Down
8 changes: 1 addition & 7 deletions pkg/kv/kvserver/replica_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,7 @@ func (r *Replica) RangeFeed(
}
ctx := r.AnnotateCtx(stream.Context())

var rSpan roachpb.RSpan
var err error
rSpan.Key, err = keys.Addr(args.Span.Key)
if err != nil {
return roachpb.NewError(err)
}
rSpan.EndKey, err = keys.Addr(args.Span.EndKey)
rSpan, err := keys.SpanAddr(args.Span)
if err != nil {
return roachpb.NewError(err)
}
Expand Down
8 changes: 1 addition & 7 deletions pkg/server/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,13 +603,7 @@ func (s *adminServer) TableDetails(
// data. Then, we count the number of ranges that make up that key span.
{
tableSpan := generateTableSpan(tableID)
tableRSpan := roachpb.RSpan{}
var err error
tableRSpan.Key, err = keys.Addr(tableSpan.Key)
if err != nil {
return nil, s.serverError(err)
}
tableRSpan.EndKey, err = keys.Addr(tableSpan.EndKey)
tableRSpan, err := keys.SpanAddr(tableSpan)
if err != nil {
return nil, s.serverError(err)
}
Expand Down
20 changes: 8 additions & 12 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,23 +802,19 @@ func (dsp *DistSQLPlanner) PartitionSpans(
nodeMap := make(map[roachpb.NodeID]int)
it := planCtx.spanIter
for _, span := range spans {
// rspan is the span we are currently partitioning.
var rspan roachpb.RSpan
var err error
if rspan.Key, err = keys.Addr(span.Key); err != nil {
return nil, err
}
if rspan.EndKey, err = keys.Addr(span.EndKey); err != nil {
// rSpan is the span we are currently partitioning.
rSpan, err := keys.SpanAddr(span)
if err != nil {
return nil, err
}

var lastNodeID roachpb.NodeID
// lastKey maintains the EndKey of the last piece of `span`.
lastKey := rspan.Key
lastKey := rSpan.Key
if log.V(1) {
log.Infof(ctx, "partitioning span %s", span)
}
// We break up rspan into its individual ranges (which may or
// We break up rSpan into its individual ranges (which may or
// may not be on separate nodes). We then create "partitioned
// spans" using the end keys of these individual ranges.
for it.Seek(ctx, span, kvcoord.Ascending); ; it.Next(ctx) {
Expand All @@ -845,8 +841,8 @@ func (dsp *DistSQLPlanner) PartitionSpans(

// Limit the end key to the end of the span we are resolving.
endKey := desc.EndKey
if rspan.EndKey.Less(endKey) {
endKey = rspan.EndKey
if rSpan.EndKey.Less(endKey) {
endKey = rSpan.EndKey
}

nodeID := replDesc.NodeID
Expand Down Expand Up @@ -882,7 +878,7 @@ func (dsp *DistSQLPlanner) PartitionSpans(
})
}

if !endKey.Less(rspan.EndKey) {
if !endKey.Less(rSpan.EndKey) {
// Done.
break
}
Expand Down
9 changes: 2 additions & 7 deletions pkg/sql/execinfra/readerbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,11 @@ func MisplannedRanges(
log.VEvent(ctx, 2, "checking range cache to see if range info updates should be communicated to the gateway")
misplanned := make(map[roachpb.RangeID]struct{})
for _, sp := range spans {
rstart, err := keys.Addr(sp.Key)
rSpan, err := keys.SpanAddr(sp)
if err != nil {
panic(err)
}
rend, err := keys.Addr(sp.EndKey)
if err != nil {
panic(err)
}
rsp := roachpb.RSpan{Key: rstart, EndKey: rend}
overlapping := rdc.GetCachedOverlapping(ctx, rsp)
overlapping := rdc.GetCachedOverlapping(ctx, rSpan)

for _, ri := range overlapping {
if _, ok := misplanned[ri.Desc.RangeID]; ok {
Expand Down
15 changes: 4 additions & 11 deletions pkg/sql/physicalplan/span_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,21 +190,14 @@ func (it *spanResolverIterator) Error() error {
func (it *spanResolverIterator) Seek(
ctx context.Context, span roachpb.Span, scanDir kvcoord.ScanDirection,
) {
var key, endKey roachpb.RKey
var err error
if key, err = keys.Addr(span.Key); err != nil {
it.err = err
return
}
if endKey, err = keys.Addr(span.EndKey); err != nil {
rSpan, err := keys.SpanAddr(span)
if err != nil {
it.err = err
return
}

oldDir := it.dir
it.curSpan = roachpb.RSpan{
Key: key,
EndKey: endKey,
}
it.curSpan = rSpan
it.dir = scanDir

var seekKey roachpb.RKey
Expand Down

0 comments on commit 2632df2

Please sign in to comment.