Skip to content

Commit

Permalink
kvconnectorccl: allow secondary tenants to prefetch range lookups
Browse files Browse the repository at this point in the history
This patch permits the tenant connector to request more than 0 ranges to be
prefetched. In order to enable this, we add logic in the implementation of the
RangeLookup RPC to filter any results which are not intended for this tenant.

Fixes cockroachdb#91433

Release note: None
  • Loading branch information
ajwerner committed Nov 21, 2022
1 parent f0554bc commit 298b016
Show file tree
Hide file tree
Showing 8 changed files with 247 additions and 9 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_test(
"main_test.go",
"setting_overrides_test.go",
"tenant_kv_test.go",
"tenant_range_lookup_test.go",
"tenant_trace_test.go",
"tenant_upgrade_test.go",
],
Expand All @@ -59,7 +60,10 @@ go_test(
"//pkg/config",
"//pkg/gossip",
"//pkg/jobs",
"//pkg/keys",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvclient/kvtenant",
"//pkg/kv/kvclient/rangecache",
"//pkg/kv/kvserver",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/roachpb",
Expand Down
5 changes: 1 addition & 4 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,10 +389,7 @@ func (c *Connector) RangeLookup(
// for more discussion on the choice of ReadConsistency and its
// implications.
ReadConsistency: rc,
// Until we add protection in the Internal service implementation to
// prevent prefetching from traversing into RangeDescriptors owned by
// other tenants, we must disable prefetching.
PrefetchNum: 0,
PrefetchNum: kvcoord.RangeLookupPrefetchCount,
PrefetchReverse: useReverseScan,
})
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/kvccl/kvtenantccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
Expand Down Expand Up @@ -313,7 +314,7 @@ func TestConnectorRangeLookup(t *testing.T) {
// Validate request.
assert.Equal(t, roachpb.RKey("a"), req.Key)
assert.Equal(t, roachpb.READ_UNCOMMITTED, req.ReadConsistency)
assert.Equal(t, int64(0), req.PrefetchNum)
assert.Equal(t, int64(kvcoord.RangeLookupPrefetchCount), req.PrefetchNum)
assert.Equal(t, false, req.PrefetchReverse)

// Respond.
Expand Down
81 changes: 81 additions & 0 deletions pkg/ccl/kvccl/kvtenantccl/tenant_range_lookup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package kvtenantccl_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangecache"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// TestRangeLookupPrefetchFiltering is an integration test to ensure that
// range results are filtered for the client.
func TestRangeLookupPrefetchFiltering(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DisableDefaultTestTenant: true, // we're going to manually add tenants
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
},
},
},
})
defer tc.Stopper().Stop(ctx)

ten2ID := roachpb.MustMakeTenantID(2)
tenant2, err := tc.Server(0).StartTenant(ctx, base.TestTenantArgs{
TenantID: ten2ID,
})
require.NoError(t, err)

// Split some ranges within tenant2 that we'll want to see in prefetch.
ten2Codec := keys.MakeSQLCodec(ten2ID)
ten2Split1 := append(ten2Codec.TenantPrefix(), 'a')
ten2Split2 := append(ten2Codec.TenantPrefix(), 'b')
{
tc.SplitRangeOrFatal(t, ten2Split1)
tc.SplitRangeOrFatal(t, ten2Split2)
}

// Split some ranges for the tenant which comes after tenant2.
{
ten3Codec := keys.MakeSQLCodec(roachpb.MustMakeTenantID(3))
tc.SplitRangeOrFatal(t, ten3Codec.TenantPrefix())
tc.SplitRangeOrFatal(t, append(ten3Codec.TenantPrefix(), 'b'))
tc.SplitRangeOrFatal(t, append(ten3Codec.TenantPrefix(), 'c'))
}

// Do the fetch and make sure we prefetch all the ranges we should see,
// and none of the ranges we should not.
db := tenant2.DistSenderI().(*kvcoord.DistSender).RangeDescriptorCache().DB()
prefixRKey := keys.MustAddr(ten2Codec.TenantPrefix())
res, prefetch, err := db.RangeLookup(
ctx, prefixRKey,
rangecache.ReadFromLeaseholder, false, /* useReverseScan */
)
require.NoError(t, err)
require.Len(t, res, 1)
require.Equal(t, prefixRKey, res[0].StartKey)
require.Len(t, prefetch, 2)
require.Equal(t, keys.MustAddr(ten2Split1), prefetch[0].StartKey)
require.Equal(t, keys.MustAddr(ten2Split2), prefetch[1].StartKey)
}
7 changes: 4 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,9 @@ var CanSendToFollower = func(
const (
// The default limit for asynchronous senders.
defaultSenderConcurrency = 1024
// The maximum number of range descriptors to prefetch during range lookups.
rangeLookupPrefetchCount = 8
// RangeLookupPrefetchCount is the maximum number of range descriptors to prefetch
// during range lookups.
RangeLookupPrefetchCount = 8
// The maximum number of times a replica is retried when it repeatedly returns
// stale lease info.
sameReplicaRetryLimit = 10
Expand Down Expand Up @@ -560,7 +561,7 @@ func (ds *DistSender) RangeLookup(
// RangeDescriptor is not on the first range we send the lookup too, we'll
// still find it when we scan to the next range. This addresses the issue
// described in #18032 and #16266, allowing us to support meta2 splits.
return kv.RangeLookup(ctx, ds, key.AsRawKey(), rc, rangeLookupPrefetchCount, useReverseScan)
return kv.RangeLookup(ctx, ds, key.AsRawKey(), rc, RangeLookupPrefetchCount, useReverseScan)
}

// FirstRange implements the RangeDescriptorDB interface.
Expand Down
2 changes: 2 additions & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ go_test(
"status_ext_test.go",
"status_test.go",
"sticky_engine_test.go",
"tenant_range_lookup_test.go",
"testserver_test.go",
"user_test.go",
"version_cluster_test.go",
Expand Down Expand Up @@ -462,6 +463,7 @@ go_test(
"//pkg/upgrade/upgrades",
"//pkg/util",
"//pkg/util/admission",
"//pkg/util/encoding",
"//pkg/util/envutil",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
Expand Down
36 changes: 35 additions & 1 deletion pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1298,6 +1298,40 @@ func setupSpanForIncomingRPC(
}
}

func tenantPrefix(tenID roachpb.TenantID) roachpb.RSpan {
// TODO(nvanbenschoten): consider caching this span.
prefix := roachpb.RKey(keys.MakeTenantPrefix(tenID))
return roachpb.RSpan{
Key: prefix,
EndKey: prefix.PrefixEnd(),
}
}

// filterRangeLookupResultsForTenant extracts the tenant ID from the context.
// It filters descs to only include the prefix which have a start key in the
// tenant's span. If there is no tenant in the context, it will filter all
// the descriptors.
func filterRangeLookupResponseForTenant(
ctx context.Context, descs []roachpb.RangeDescriptor,
) []roachpb.RangeDescriptor {
tenID, ok := roachpb.TenantFromContext(ctx)
if !ok {
// If we do not know the tenant, don't permit any pre-fetching.
return []roachpb.RangeDescriptor{}
}
rs := tenantPrefix(tenID)
truncated := descs[:0]
// We say that any range which has a start key within the tenant prefix is
// fair game for the tenant to know about.
for _, d := range descs {
if !rs.ContainsKey(d.StartKey) {
break
}
truncated = append(truncated, d)
}
return truncated
}

// RangeLookup implements the roachpb.InternalServer interface.
func (n *Node) RangeLookup(
ctx context.Context, req *roachpb.RangeLookupRequest,
Expand All @@ -1324,7 +1358,7 @@ func (n *Node) RangeLookup(
resp.Error = roachpb.NewError(err)
} else {
resp.Descriptors = rs
resp.PrefetchedDescriptors = preRs
resp.PrefetchedDescriptors = filterRangeLookupResponseForTenant(ctx, preRs)
}
return resp, nil
}
Expand Down
118 changes: 118 additions & 0 deletions pkg/server/tenant_range_lookup_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package server

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/stretchr/testify/require"
)

// TestFilterRangeLookupResponseForTenant unit tests the logic to filter
// RangeLookup connector requests.
func TestFilterRangeLookupResponseForTenant(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
mkKey := func(tenant uint64, str string) roachpb.RKey {
codec := keys.MakeSQLCodec(roachpb.MustMakeTenantID(tenant))
return encoding.EncodeStringAscending(codec.TenantPrefix(), str)
}
mkRangeDescriptor := func(start, end roachpb.RKey) roachpb.RangeDescriptor {
return *roachpb.NewRangeDescriptor(1, start, end, roachpb.MakeReplicaSet(nil))
}
for _, tc := range []struct {
name string
id roachpb.TenantID
descs []roachpb.RangeDescriptor
exp int
skipTenantContext bool
}{
// tenant 1, the "system tenant" can see everything.
{
name: "tenant 1 is special",
id: roachpb.MustMakeTenantID(1),
descs: []roachpb.RangeDescriptor{
mkRangeDescriptor(mkKey(1, "a"), mkKey(1, "b")),
mkRangeDescriptor(mkKey(1, "b"), mkKey(1, "z")),
mkRangeDescriptor(mkKey(2, "z"), mkKey(2, "")),
mkRangeDescriptor(mkKey(2, ""), mkKey(2, "a")),
mkRangeDescriptor(mkKey(2, "a"), mkKey(3, "")),
mkRangeDescriptor(mkKey(3, ""), mkKey(4, "")),
},
exp: 6,
},
// tenant 2 is a normal secondary tenant and can only see its own data.
{
name: "filter to tenant data",
id: roachpb.MustMakeTenantID(2),
descs: []roachpb.RangeDescriptor{
mkRangeDescriptor(mkKey(2, "a"), mkKey(2, "b")),
mkRangeDescriptor(mkKey(2, "b"), mkKey(2, "z")),
mkRangeDescriptor(mkKey(2, "z"), mkKey(3, "")),
mkRangeDescriptor(mkKey(3, ""), mkKey(3, "a")),
},
exp: 3,
},
// tenant 2 is a normal secondary tenant and can only see its own data,
// but this includes the case where the range overlaps with multiple
// tenants.
{
name: "filter to tenant data even though range crosses tenants",
id: roachpb.MustMakeTenantID(2),
descs: []roachpb.RangeDescriptor{
mkRangeDescriptor(mkKey(2, "a"), mkKey(2, "b")),
mkRangeDescriptor(mkKey(2, "b"), mkKey(2, "z")),
mkRangeDescriptor(mkKey(2, "z"), mkKey(4, "")),
mkRangeDescriptor(mkKey(4, ""), mkKey(4, "a")),
},
exp: 3,
},
// If there is no tenant ID in the context, only one result should be
// returned.
{
id: roachpb.MustMakeTenantID(2),
descs: []roachpb.RangeDescriptor{
mkRangeDescriptor(mkKey(2, "a"), mkKey(2, "b")),
mkRangeDescriptor(mkKey(2, "b"), mkKey(2, "z")),
mkRangeDescriptor(mkKey(2, "z"), mkKey(3, "")),
mkRangeDescriptor(mkKey(3, ""), mkKey(3, "a")),
},
skipTenantContext: true,
exp: 0,
},
// Other code should prevent a request that might return descriptors from
// another tenant, however, defensively this code should also filter them.
{
id: roachpb.MustMakeTenantID(3),
descs: []roachpb.RangeDescriptor{
mkRangeDescriptor(mkKey(2, "a"), mkKey(2, "b")),
mkRangeDescriptor(mkKey(2, "b"), mkKey(2, "z")),
mkRangeDescriptor(mkKey(2, "z"), mkKey(3, "")),
mkRangeDescriptor(mkKey(3, ""), mkKey(3, "a")),
},
exp: 0,
},
} {
tenantCtx := ctx
if !tc.skipTenantContext {
tenantCtx = roachpb.NewContextForTenant(ctx, tc.id)
}
got := filterRangeLookupResponseForTenant(tenantCtx, tc.descs)
require.Len(t, got, tc.exp)
require.Equal(t, tc.descs[:tc.exp], got)
}
}

0 comments on commit 298b016

Please sign in to comment.