diff --git a/pkg/ccl/kvccl/kvtenantccl/connector.go b/pkg/ccl/kvccl/kvtenantccl/connector.go index 88b77a34be22..7ae2e7a186c2 100644 --- a/pkg/ccl/kvccl/kvtenantccl/connector.go +++ b/pkg/ccl/kvccl/kvtenantccl/connector.go @@ -536,9 +536,12 @@ func (c *Connector) NewIterator( curIdx: 0, }, nil } - // TODO(arul): We probably don't want to treat all errors here as "soft". - // Soft RPC error. Drop client and retry. log.Warningf(ctx, "error consuming GetRangeDescriptors RPC: %v", err) + if grpcutil.IsAuthError(err) { + // Authentication or authorization error. Propagate. + return nil, err + } + // Soft RPC error. Drop client and retry. c.tryForgetClient(ctx, client) break } diff --git a/pkg/ccl/kvccl/kvtenantccl/tenant_scan_range_descriptors_test.go b/pkg/ccl/kvccl/kvtenantccl/tenant_scan_range_descriptors_test.go index 47e133145eab..18578b5c14d7 100644 --- a/pkg/ccl/kvccl/kvtenantccl/tenant_scan_range_descriptors_test.go +++ b/pkg/ccl/kvccl/kvtenantccl/tenant_scan_range_descriptors_test.go @@ -16,18 +16,16 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/rangedesc" "github.com/stretchr/testify/require" ) -// TestScanRangeDescriptors is an integration test to ensure that tenants can -// scan range descriptors iff they correspond to tenant owned ranges. -func TestScanRangeDescriptors(t *testing.T) { - defer leaktest.AfterTest(t)() - - ctx := context.Background() +func setup( + t *testing.T, ctx context.Context, +) (*testcluster.TestCluster, serverutils.TestTenantInterface, rangedesc.IteratorFactory) { tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ ServerArgs: base.TestServerArgs{ DisableDefaultTestTenant: true, // we're going to manually add tenants @@ -38,16 +36,26 @@ func TestScanRangeDescriptors(t *testing.T) { }, }, }) - defer tc.Stopper().Stop(ctx) ten2ID := roachpb.MustMakeTenantID(2) tenant2, err := tc.Server(0).StartTenant(ctx, base.TestTenantArgs{ TenantID: ten2ID, }) require.NoError(t, err) + return tc, tenant2, tenant2.RangeDescIteratorFactory().(rangedesc.IteratorFactory) +} + +// TestScanRangeDescriptors is an integration test to ensure that tenants can +// scan range descriptors iff they correspond to tenant owned ranges. +func TestScanRangeDescriptors(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc, tenant2, iteratorFactory := setup(t, ctx) + defer tc.Stopper().Stop(ctx) // Split some ranges within tenant2 that we'll scan over. - ten2Codec := keys.MakeSQLCodec(ten2ID) + ten2Codec := tenant2.Codec() ten2Split1 := append(ten2Codec.TenantPrefix(), 'a') ten2Split2 := append(ten2Codec.TenantPrefix(), 'b') { @@ -56,7 +64,6 @@ func TestScanRangeDescriptors(t *testing.T) { tc.SplitRangeOrFatal(t, ten2Codec.TenantPrefix().PrefixEnd()) // Last range } - iteratorFactory := tenant2.RangeDescIteratorFactory().(rangedesc.IteratorFactory) iter, err := iteratorFactory.NewIterator(ctx, ten2Codec.TenantSpan()) require.NoError(t, err) @@ -112,3 +119,14 @@ func TestScanRangeDescriptors(t *testing.T) { rangeDescs[numRanges-1].StartKey, ) } + +func TestScanRangeDescriptorsOutsideTenantKeyspace(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + tc, _, iteratorFactory := setup(t, ctx) + defer tc.Stopper().Stop(ctx) + + _, err := iteratorFactory.NewIterator(ctx, keys.EverythingSpan) + require.ErrorContains(t, err, "requested key span /M{in-ax} not fully contained in tenant keyspace /Tenant/{2-3}") +} diff --git a/pkg/server/node_http_router_test.go b/pkg/server/node_http_router_test.go index fc771ee82099..d45d681c51ce 100644 --- a/pkg/server/node_http_router_test.go +++ b/pkg/server/node_http_router_test.go @@ -53,7 +53,7 @@ func TestRouteToNode(t *testing.T) { sourceServerID: 1, nodeIDRequestedInCookie: "local", expectStatusCode: 200, - expectRegex: regexp.MustCompile(`ranges_underreplicated{store="2"}`), + expectRegex: regexp.MustCompile(`ranges_underreplicated{store="2",node_id="2"}`), }, { name: "remote _status/vars on node 2 from node 1 using cookie", @@ -61,7 +61,7 @@ func TestRouteToNode(t *testing.T) { sourceServerID: 0, nodeIDRequestedInCookie: "2", expectStatusCode: 200, - expectRegex: regexp.MustCompile(`ranges_underreplicated{store="2"}`), + expectRegex: regexp.MustCompile(`ranges_underreplicated{store="2",node_id="2"}`), }, { name: "remote _status/vars on node 1 from node 2 using cookie", @@ -69,7 +69,7 @@ func TestRouteToNode(t *testing.T) { sourceServerID: 1, nodeIDRequestedInCookie: "1", expectStatusCode: 200, - expectRegex: regexp.MustCompile(`ranges_underreplicated{store="1"}`), + expectRegex: regexp.MustCompile(`ranges_underreplicated{store="1",node_id="1"}`), }, { name: "remote _status/vars on node 2 from node 1 using query param", @@ -77,7 +77,7 @@ func TestRouteToNode(t *testing.T) { sourceServerID: 0, nodeIDRequestedInQueryParam: "2", expectStatusCode: 200, - expectRegex: regexp.MustCompile(`ranges_underreplicated{store="2"}`), + expectRegex: regexp.MustCompile(`ranges_underreplicated{store="2",node_id="2"}`), }, { name: "query param overrides cookie", @@ -86,7 +86,7 @@ func TestRouteToNode(t *testing.T) { nodeIDRequestedInCookie: "local", nodeIDRequestedInQueryParam: "2", expectStatusCode: 200, - expectRegex: regexp.MustCompile(`ranges_underreplicated{store="2"}`), + expectRegex: regexp.MustCompile(`ranges_underreplicated{store="2",node_id="2"}`), }, { name: "remote / root HTML on node 2 from node 1 using cookie", diff --git a/pkg/server/status/recorder.go b/pkg/server/status/recorder.go index 72cf79727044..7874aef443b4 100644 --- a/pkg/server/status/recorder.go +++ b/pkg/server/status/recorder.go @@ -227,6 +227,12 @@ func (mr *MetricsRecorder) AddNode( nodeIDGauge.Update(int64(desc.NodeID)) reg.AddMetric(nodeIDGauge) reg.AddLabel("tenant", mr.tenantNameContainer) + reg.AddLabel("node_id", strconv.Itoa(int(desc.NodeID))) + // We assume that all stores have been added to the registry + // prior to calling `AddNode`. + for _, s := range mr.mu.storeRegistries { + s.AddLabel("node_id", strconv.Itoa(int(desc.NodeID))) + } } // AddStore adds the Registry from the provided store as a store-level registry diff --git a/pkg/server/status/recorder_test.go b/pkg/server/status/recorder_test.go index c9bbc7a77a71..af4b2bd10eb0 100644 --- a/pkg/server/status/recorder_test.go +++ b/pkg/server/status/recorder_test.go @@ -99,10 +99,10 @@ func (fs fakeStore) Registry() *metric.Registry { return fs.registry } -func TestMetricsRecorderTenants(t *testing.T) { +func TestMetricsRecorderLabels(t *testing.T) { defer leaktest.AfterTest(t)() nodeDesc := roachpb.NodeDescriptor{ - NodeID: roachpb.NodeID(1), + NodeID: roachpb.NodeID(7), } reg1 := metric.NewRegistry() manual := timeutil.NewManualTime(timeutil.Unix(0, 100)) @@ -118,7 +118,7 @@ func TestMetricsRecorderTenants(t *testing.T) { recorder.AddNode(reg1, nodeDesc, 50, "foo:26257", "foo:26258", "foo:5432") nodeDescTenant := roachpb.NodeDescriptor{ - NodeID: roachpb.NodeID(1), + NodeID: roachpb.NodeID(7), } regTenant := metric.NewRegistry() stTenant := cluster.MakeTestingClusterSettings() @@ -154,15 +154,15 @@ func TestMetricsRecorderTenants(t *testing.T) { err = recorder.PrintAsText(buf) require.NoError(t, err) - require.Contains(t, buf.String(), `some_metric{tenant="system"} 123`) - require.Contains(t, buf.String(), `some_metric{tenant="application"} 456`) + require.Contains(t, buf.String(), `some_metric{tenant="system",node_id="7"} 123`) + require.Contains(t, buf.String(), `some_metric{tenant="application",node_id="7"} 456`) bufTenant := bytes.NewBuffer([]byte{}) err = recorderTenant.PrintAsText(bufTenant) require.NoError(t, err) - require.NotContains(t, bufTenant.String(), `some_metric{tenant="system"} 123`) - require.Contains(t, bufTenant.String(), `some_metric{tenant="application"} 456`) + require.NotContains(t, bufTenant.String(), `some_metric{tenant="system",node_id="7"} 123`) + require.Contains(t, bufTenant.String(), `some_metric{tenant="application",node_id="7"} 456`) // Update app name in container and ensure // output changes accordingly. @@ -172,15 +172,15 @@ func TestMetricsRecorderTenants(t *testing.T) { err = recorder.PrintAsText(buf) require.NoError(t, err) - require.Contains(t, buf.String(), `some_metric{tenant="system"} 123`) - require.Contains(t, buf.String(), `some_metric{tenant="application2"} 456`) + require.Contains(t, buf.String(), `some_metric{tenant="system",node_id="7"} 123`) + require.Contains(t, buf.String(), `some_metric{tenant="application2",node_id="7"} 456`) bufTenant = bytes.NewBuffer([]byte{}) err = recorderTenant.PrintAsText(bufTenant) require.NoError(t, err) - require.NotContains(t, bufTenant.String(), `some_metric{tenant="system"} 123`) - require.Contains(t, bufTenant.String(), `some_metric{tenant="application2"} 456`) + require.NotContains(t, bufTenant.String(), `some_metric{tenant="system",node_id="7"} 123`) + require.Contains(t, bufTenant.String(), `some_metric{tenant="application2",node_id="7"} 456`) // ======================================== // Verify that the recorder processes tenant time series registries @@ -190,17 +190,17 @@ func TestMetricsRecorderTenants(t *testing.T) { // System tenant metrics { Name: "cr.node.node-id", - Source: "1", + Source: "7", Datapoints: []tspb.TimeSeriesDatapoint{ { TimestampNanos: manual.Now().UnixNano(), - Value: float64(1), + Value: float64(7), }, }, }, { Name: "cr.node.some_metric", - Source: "1", + Source: "7", Datapoints: []tspb.TimeSeriesDatapoint{ { TimestampNanos: manual.Now().UnixNano(), @@ -211,7 +211,7 @@ func TestMetricsRecorderTenants(t *testing.T) { // App tenant metrics { Name: "cr.node.node-id", - Source: "1-123", + Source: "7-123", Datapoints: []tspb.TimeSeriesDatapoint{ { TimestampNanos: manual.Now().UnixNano(), @@ -221,7 +221,7 @@ func TestMetricsRecorderTenants(t *testing.T) { }, { Name: "cr.node.some_metric", - Source: "1-123", + Source: "7-123", Datapoints: []tspb.TimeSeriesDatapoint{ { TimestampNanos: manual.Now().UnixNano(), diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index a3d52af4b6ac..1ed158db2f17 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -1339,20 +1339,20 @@ func TestStatusVarsTxnMetrics(t *testing.T) { if err != nil { t.Fatal(err) } - if !bytes.Contains(body, []byte("sql_txn_begin_count{tenant=\"system\"} 1")) { - t.Errorf("expected `sql_txn_begin_count{tenant=\"system\"} 1`, got: %s", body) + if !bytes.Contains(body, []byte("sql_txn_begin_count{tenant=\"system\",node_id=\"1\"} 1")) { + t.Errorf("expected `sql_txn_begin_count{tenant=\"system\",node_id=\"1\"} 1`, got: %s", body) } - if !bytes.Contains(body, []byte("sql_restart_savepoint_count{tenant=\"system\"} 1")) { - t.Errorf("expected `sql_restart_savepoint_count{tenant=\"system\"} 1`, got: %s", body) + if !bytes.Contains(body, []byte("sql_restart_savepoint_count{tenant=\"system\",node_id=\"1\"} 1")) { + t.Errorf("expected `sql_restart_savepoint_count{tenant=\"system\",node_id=\"1\"} 1`, got: %s", body) } - if !bytes.Contains(body, []byte("sql_restart_savepoint_release_count{tenant=\"system\"} 1")) { - t.Errorf("expected `sql_restart_savepoint_release_count{tenant=\"system\"} 1`, got: %s", body) + if !bytes.Contains(body, []byte("sql_restart_savepoint_release_count{tenant=\"system\",node_id=\"1\"} 1")) { + t.Errorf("expected `sql_restart_savepoint_release_count{tenant=\"system\",node_id=\"1\"} 1`, got: %s", body) } - if !bytes.Contains(body, []byte("sql_txn_commit_count{tenant=\"system\"} 1")) { - t.Errorf("expected `sql_txn_commit_count{tenant=\"system\"} 1`, got: %s", body) + if !bytes.Contains(body, []byte("sql_txn_commit_count{tenant=\"system\",node_id=\"1\"} 1")) { + t.Errorf("expected `sql_txn_commit_count{tenant=\"system\",node_id=\"1\"} 1`, got: %s", body) } - if !bytes.Contains(body, []byte("sql_txn_rollback_count{tenant=\"system\"} 0")) { - t.Errorf("expected `sql_txn_rollback_count{tenant=\"system\"} 0`, got: %s", body) + if !bytes.Contains(body, []byte("sql_txn_rollback_count{tenant=\"system\",node_id=\"1\"} 0")) { + t.Errorf("expected `sql_txn_rollback_count{tenant=\"system\",node_id=\"1\"} 0`, got: %s", body) } }