Skip to content

Commit

Permalink
Merge #98640 #99143
Browse files Browse the repository at this point in the history
98640: server: add `node_id` label to _status/vars output r=aadityasondhi a=dhartunian

Previously, the output of the prometheus metrics via `_status/ vars` did not include any node labels. This caused challenges for customers who want to monitor large clusters as it requires additional configuration on the scrape- side to ensure a node ID is added to the metrics. This can be challenging to deal with when nodes come and go in a cluster and the scrape configuration must change as well.

This change adds a `node_id` prometheus label to the metrics we output that matches the current node's ID. Since `_status/vars` is output from a single node there is only ever one single value that's appropriate here.

Secondary tenants will mark their metrics with either the nodeID of the shared- process system tenant, or the instanceID of the tenant process.

Resolves: #94763
Epic: None

Release note (ops change): Prometheus metrics available at the `_status/vars` path now contain a `node_id` label that identifies the node they were scraped from.

99143: multitenant: NewIterator connector infinite retry loop r=stevendanna a=ecwall

Fixes #98822

This change fixes an infinite retry loop in `Connector.NewIterator` that would occur when the `GetRangeDescriptors` stream returned an auth error. An example trigger would be passing in a span that was outside of the calling tenant's keyspace.

Now `NewIterator` correctly propagates auth errors up to the caller.

Release note: None

Co-authored-by: David Hartunian <[email protected]>
Co-authored-by: Evan Wall <[email protected]>
  • Loading branch information
3 people committed Mar 22, 2023
3 parents f592975 + 2353a6a + f3b8baf commit 1e2ea17
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 42 deletions.
7 changes: 5 additions & 2 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,9 +536,12 @@ func (c *Connector) NewIterator(
curIdx: 0,
}, nil
}
// TODO(arul): We probably don't want to treat all errors here as "soft".
// Soft RPC error. Drop client and retry.
log.Warningf(ctx, "error consuming GetRangeDescriptors RPC: %v", err)
if grpcutil.IsAuthError(err) {
// Authentication or authorization error. Propagate.
return nil, err
}
// Soft RPC error. Drop client and retry.
c.tryForgetClient(ctx, client)
break
}
Expand Down
36 changes: 27 additions & 9 deletions pkg/ccl/kvccl/kvtenantccl/tenant_scan_range_descriptors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/rangedesc"
"github.com/stretchr/testify/require"
)

// TestScanRangeDescriptors is an integration test to ensure that tenants can
// scan range descriptors iff they correspond to tenant owned ranges.
func TestScanRangeDescriptors(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
func setup(
t *testing.T, ctx context.Context,
) (*testcluster.TestCluster, serverutils.TestTenantInterface, rangedesc.IteratorFactory) {
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DisableDefaultTestTenant: true, // we're going to manually add tenants
Expand All @@ -38,16 +36,26 @@ func TestScanRangeDescriptors(t *testing.T) {
},
},
})
defer tc.Stopper().Stop(ctx)

ten2ID := roachpb.MustMakeTenantID(2)
tenant2, err := tc.Server(0).StartTenant(ctx, base.TestTenantArgs{
TenantID: ten2ID,
})
require.NoError(t, err)
return tc, tenant2, tenant2.RangeDescIteratorFactory().(rangedesc.IteratorFactory)
}

// TestScanRangeDescriptors is an integration test to ensure that tenants can
// scan range descriptors iff they correspond to tenant owned ranges.
func TestScanRangeDescriptors(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc, tenant2, iteratorFactory := setup(t, ctx)
defer tc.Stopper().Stop(ctx)

// Split some ranges within tenant2 that we'll scan over.
ten2Codec := keys.MakeSQLCodec(ten2ID)
ten2Codec := tenant2.Codec()
ten2Split1 := append(ten2Codec.TenantPrefix(), 'a')
ten2Split2 := append(ten2Codec.TenantPrefix(), 'b')
{
Expand All @@ -56,7 +64,6 @@ func TestScanRangeDescriptors(t *testing.T) {
tc.SplitRangeOrFatal(t, ten2Codec.TenantPrefix().PrefixEnd()) // Last range
}

iteratorFactory := tenant2.RangeDescIteratorFactory().(rangedesc.IteratorFactory)
iter, err := iteratorFactory.NewIterator(ctx, ten2Codec.TenantSpan())
require.NoError(t, err)

Expand Down Expand Up @@ -112,3 +119,14 @@ func TestScanRangeDescriptors(t *testing.T) {
rangeDescs[numRanges-1].StartKey,
)
}

func TestScanRangeDescriptorsOutsideTenantKeyspace(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
tc, _, iteratorFactory := setup(t, ctx)
defer tc.Stopper().Stop(ctx)

_, err := iteratorFactory.NewIterator(ctx, keys.EverythingSpan)
require.ErrorContains(t, err, "requested key span /M{in-ax} not fully contained in tenant keyspace /Tenant/{2-3}")
}
10 changes: 5 additions & 5 deletions pkg/server/node_http_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,31 +53,31 @@ func TestRouteToNode(t *testing.T) {
sourceServerID: 1,
nodeIDRequestedInCookie: "local",
expectStatusCode: 200,
expectRegex: regexp.MustCompile(`ranges_underreplicated{store="2"}`),
expectRegex: regexp.MustCompile(`ranges_underreplicated{store="2",node_id="2"}`),
},
{
name: "remote _status/vars on node 2 from node 1 using cookie",
path: "/_status/vars",
sourceServerID: 0,
nodeIDRequestedInCookie: "2",
expectStatusCode: 200,
expectRegex: regexp.MustCompile(`ranges_underreplicated{store="2"}`),
expectRegex: regexp.MustCompile(`ranges_underreplicated{store="2",node_id="2"}`),
},
{
name: "remote _status/vars on node 1 from node 2 using cookie",
path: "/_status/vars",
sourceServerID: 1,
nodeIDRequestedInCookie: "1",
expectStatusCode: 200,
expectRegex: regexp.MustCompile(`ranges_underreplicated{store="1"}`),
expectRegex: regexp.MustCompile(`ranges_underreplicated{store="1",node_id="1"}`),
},
{
name: "remote _status/vars on node 2 from node 1 using query param",
path: "/_status/vars",
sourceServerID: 0,
nodeIDRequestedInQueryParam: "2",
expectStatusCode: 200,
expectRegex: regexp.MustCompile(`ranges_underreplicated{store="2"}`),
expectRegex: regexp.MustCompile(`ranges_underreplicated{store="2",node_id="2"}`),
},
{
name: "query param overrides cookie",
Expand All @@ -86,7 +86,7 @@ func TestRouteToNode(t *testing.T) {
nodeIDRequestedInCookie: "local",
nodeIDRequestedInQueryParam: "2",
expectStatusCode: 200,
expectRegex: regexp.MustCompile(`ranges_underreplicated{store="2"}`),
expectRegex: regexp.MustCompile(`ranges_underreplicated{store="2",node_id="2"}`),
},
{
name: "remote / root HTML on node 2 from node 1 using cookie",
Expand Down
6 changes: 6 additions & 0 deletions pkg/server/status/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,12 @@ func (mr *MetricsRecorder) AddNode(
nodeIDGauge.Update(int64(desc.NodeID))
reg.AddMetric(nodeIDGauge)
reg.AddLabel("tenant", mr.tenantNameContainer)
reg.AddLabel("node_id", strconv.Itoa(int(desc.NodeID)))
// We assume that all stores have been added to the registry
// prior to calling `AddNode`.
for _, s := range mr.mu.storeRegistries {
s.AddLabel("node_id", strconv.Itoa(int(desc.NodeID)))
}
}

// AddStore adds the Registry from the provided store as a store-level registry
Expand Down
32 changes: 16 additions & 16 deletions pkg/server/status/recorder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ func (fs fakeStore) Registry() *metric.Registry {
return fs.registry
}

func TestMetricsRecorderTenants(t *testing.T) {
func TestMetricsRecorderLabels(t *testing.T) {
defer leaktest.AfterTest(t)()
nodeDesc := roachpb.NodeDescriptor{
NodeID: roachpb.NodeID(1),
NodeID: roachpb.NodeID(7),
}
reg1 := metric.NewRegistry()
manual := timeutil.NewManualTime(timeutil.Unix(0, 100))
Expand All @@ -118,7 +118,7 @@ func TestMetricsRecorderTenants(t *testing.T) {
recorder.AddNode(reg1, nodeDesc, 50, "foo:26257", "foo:26258", "foo:5432")

nodeDescTenant := roachpb.NodeDescriptor{
NodeID: roachpb.NodeID(1),
NodeID: roachpb.NodeID(7),
}
regTenant := metric.NewRegistry()
stTenant := cluster.MakeTestingClusterSettings()
Expand Down Expand Up @@ -154,15 +154,15 @@ func TestMetricsRecorderTenants(t *testing.T) {
err = recorder.PrintAsText(buf)
require.NoError(t, err)

require.Contains(t, buf.String(), `some_metric{tenant="system"} 123`)
require.Contains(t, buf.String(), `some_metric{tenant="application"} 456`)
require.Contains(t, buf.String(), `some_metric{tenant="system",node_id="7"} 123`)
require.Contains(t, buf.String(), `some_metric{tenant="application",node_id="7"} 456`)

bufTenant := bytes.NewBuffer([]byte{})
err = recorderTenant.PrintAsText(bufTenant)
require.NoError(t, err)

require.NotContains(t, bufTenant.String(), `some_metric{tenant="system"} 123`)
require.Contains(t, bufTenant.String(), `some_metric{tenant="application"} 456`)
require.NotContains(t, bufTenant.String(), `some_metric{tenant="system",node_id="7"} 123`)
require.Contains(t, bufTenant.String(), `some_metric{tenant="application",node_id="7"} 456`)

// Update app name in container and ensure
// output changes accordingly.
Expand All @@ -172,15 +172,15 @@ func TestMetricsRecorderTenants(t *testing.T) {
err = recorder.PrintAsText(buf)
require.NoError(t, err)

require.Contains(t, buf.String(), `some_metric{tenant="system"} 123`)
require.Contains(t, buf.String(), `some_metric{tenant="application2"} 456`)
require.Contains(t, buf.String(), `some_metric{tenant="system",node_id="7"} 123`)
require.Contains(t, buf.String(), `some_metric{tenant="application2",node_id="7"} 456`)

bufTenant = bytes.NewBuffer([]byte{})
err = recorderTenant.PrintAsText(bufTenant)
require.NoError(t, err)

require.NotContains(t, bufTenant.String(), `some_metric{tenant="system"} 123`)
require.Contains(t, bufTenant.String(), `some_metric{tenant="application2"} 456`)
require.NotContains(t, bufTenant.String(), `some_metric{tenant="system",node_id="7"} 123`)
require.Contains(t, bufTenant.String(), `some_metric{tenant="application2",node_id="7"} 456`)

// ========================================
// Verify that the recorder processes tenant time series registries
Expand All @@ -190,17 +190,17 @@ func TestMetricsRecorderTenants(t *testing.T) {
// System tenant metrics
{
Name: "cr.node.node-id",
Source: "1",
Source: "7",
Datapoints: []tspb.TimeSeriesDatapoint{
{
TimestampNanos: manual.Now().UnixNano(),
Value: float64(1),
Value: float64(7),
},
},
},
{
Name: "cr.node.some_metric",
Source: "1",
Source: "7",
Datapoints: []tspb.TimeSeriesDatapoint{
{
TimestampNanos: manual.Now().UnixNano(),
Expand All @@ -211,7 +211,7 @@ func TestMetricsRecorderTenants(t *testing.T) {
// App tenant metrics
{
Name: "cr.node.node-id",
Source: "1-123",
Source: "7-123",
Datapoints: []tspb.TimeSeriesDatapoint{
{
TimestampNanos: manual.Now().UnixNano(),
Expand All @@ -221,7 +221,7 @@ func TestMetricsRecorderTenants(t *testing.T) {
},
{
Name: "cr.node.some_metric",
Source: "1-123",
Source: "7-123",
Datapoints: []tspb.TimeSeriesDatapoint{
{
TimestampNanos: manual.Now().UnixNano(),
Expand Down
20 changes: 10 additions & 10 deletions pkg/server/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1339,20 +1339,20 @@ func TestStatusVarsTxnMetrics(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if !bytes.Contains(body, []byte("sql_txn_begin_count{tenant=\"system\"} 1")) {
t.Errorf("expected `sql_txn_begin_count{tenant=\"system\"} 1`, got: %s", body)
if !bytes.Contains(body, []byte("sql_txn_begin_count{tenant=\"system\",node_id=\"1\"} 1")) {
t.Errorf("expected `sql_txn_begin_count{tenant=\"system\",node_id=\"1\"} 1`, got: %s", body)
}
if !bytes.Contains(body, []byte("sql_restart_savepoint_count{tenant=\"system\"} 1")) {
t.Errorf("expected `sql_restart_savepoint_count{tenant=\"system\"} 1`, got: %s", body)
if !bytes.Contains(body, []byte("sql_restart_savepoint_count{tenant=\"system\",node_id=\"1\"} 1")) {
t.Errorf("expected `sql_restart_savepoint_count{tenant=\"system\",node_id=\"1\"} 1`, got: %s", body)
}
if !bytes.Contains(body, []byte("sql_restart_savepoint_release_count{tenant=\"system\"} 1")) {
t.Errorf("expected `sql_restart_savepoint_release_count{tenant=\"system\"} 1`, got: %s", body)
if !bytes.Contains(body, []byte("sql_restart_savepoint_release_count{tenant=\"system\",node_id=\"1\"} 1")) {
t.Errorf("expected `sql_restart_savepoint_release_count{tenant=\"system\",node_id=\"1\"} 1`, got: %s", body)
}
if !bytes.Contains(body, []byte("sql_txn_commit_count{tenant=\"system\"} 1")) {
t.Errorf("expected `sql_txn_commit_count{tenant=\"system\"} 1`, got: %s", body)
if !bytes.Contains(body, []byte("sql_txn_commit_count{tenant=\"system\",node_id=\"1\"} 1")) {
t.Errorf("expected `sql_txn_commit_count{tenant=\"system\",node_id=\"1\"} 1`, got: %s", body)
}
if !bytes.Contains(body, []byte("sql_txn_rollback_count{tenant=\"system\"} 0")) {
t.Errorf("expected `sql_txn_rollback_count{tenant=\"system\"} 0`, got: %s", body)
if !bytes.Contains(body, []byte("sql_txn_rollback_count{tenant=\"system\",node_id=\"1\"} 0")) {
t.Errorf("expected `sql_txn_rollback_count{tenant=\"system\",node_id=\"1\"} 0`, got: %s", body)
}
}

Expand Down

0 comments on commit 1e2ea17

Please sign in to comment.