Skip to content

Commit

Permalink
spanconfig: limit # of tenant span configs
Browse files Browse the repository at this point in the history
Fixes #70555. In order to limit the number of span configs a tenant's
able to install, we introduce a tenant-side spanconfig.Limiter. It
presents the following interface:

    // Limiter is used to limit the number of span configs installed by
    // secondary tenants. It takes in a delta (typically the difference
    // in span configs between the committed and uncommitted state in
    // the txn), uses it to maintain an aggregate counter, and informs
    // the caller if exceeding the prescribed limit.
    type Limiter interface {
      ShouldLimit(
        ctx context.Context, txn *kv.Txn, delta int,
      ) (bool, error)
    }

The delta is computed using a static helper, spanconfig.Delta:

    // Delta considers both the committed and uncommitted state of a
    // table descriptor and computes the difference in the number of
    // spans we can apply a configuration over.
    func Delta(
      ctx context.Context, s Splitter,
      committed, uncommitted catalog.TableDescriptor,
    ) (int, error)

This limiter only applies to secondary tenants. The counter is
maintained in a newly introduced (tenant-only) system table, using the
following schema:

    CREATE TABLE system.span_count (
      singleton  BOOL DEFAULT TRUE,
      span_count INT NOT NULL,
      CONSTRAINT "primary" PRIMARY KEY (singleton),
      CONSTRAINT single_row CHECK (singleton),
      FAMILY "primary" (singleton, span_count)
    );

We need just two integration points for spanconfig.Limiter:
- Right above CheckTwoVersionInvariant, where we're able to hook into
  the committed and to-be-committed descriptor state before txn commit;
- In the GC job, when gc-ing table state. We decrement a table's split
  count when GC-ing the table for good.

The per-tenant span config limit used is controlled by a new tenant
read-only cluster setting: spanconfig.tenant_limit. Multi-tenant cluster
settings (#73857) provides the infrastructure for the host tenant to be
able to control this setting cluster wide, or to target a specific
tenant at a time.

We also need a migration here, to start tracking span counts for
clusters with pre-existing tenants. We introduce a migration that scans
over all table descriptors and seeds system.span_count with the right
value. Given cluster version gates disseminate asynchronously, we also
need a preliminary version to start tracking incremental changes.

It's useful to introduce the notion of debt. This will be handy if/when
we lower per-tenant limits, and also in the migration above since it's
possible for pre-existing tenants to have committed state in violation
of the prescribed limit. When in debt, schema changes that add new
splits will be rejected (dropping tables/indexes/partitions/etc. will
work just fine).

When attempting a txn that goes over the configured limit, the UX is as
follows:

    > CREATE TABLE db.t42(i INT PRIMARY KEY);
    pq: exceeded limit for number of table spans

Release note: None
Release justification: low risk, high benefit change

Release note: None
  • Loading branch information
irfansharif committed Feb 18, 2022
1 parent 03f9e53 commit 4823f8c
Show file tree
Hide file tree
Showing 70 changed files with 1,476 additions and 62 deletions.
2 changes: 1 addition & 1 deletion docs/generated/settings/settings-for-tenants.txt
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using
trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.
trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https://<ui>/#/debug/tracez
trace.zipkin.collector string the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.
version version 21.2-106 set the active cluster version in the format '<major>.<minor>'
version version 21.2-112 set the active cluster version in the format '<major>.<minor>'
2 changes: 1 addition & 1 deletion docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,6 @@
<tr><td><code>trace.opentelemetry.collector</code></td><td>string</td><td><code></code></td><td>address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as <host>:<port>. If no port is specified, 4317 will be used.</td></tr>
<tr><td><code>trace.span_registry.enabled</code></td><td>boolean</td><td><code>true</code></td><td>if set, ongoing traces can be seen at https://<ui>/#/debug/tracez</td></tr>
<tr><td><code>trace.zipkin.collector</code></td><td>string</td><td><code></code></td><td>the address of a Zipkin instance to receive traces, as <host>:<port>. If no port is specified, 9411 will be used.</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-106</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
<tr><td><code>version</code></td><td>version</td><td><code>21.2-112</code></td><td>set the active cluster version in the format '<major>.<minor>'</td></tr>
</tbody>
</table>
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ ALL_TESTS = [
"//pkg/ccl/serverccl/statusccl:statusccl_test",
"//pkg/ccl/serverccl:serverccl_test",
"//pkg/ccl/spanconfigccl/spanconfigcomparedccl:spanconfigcomparedccl_test",
"//pkg/ccl/spanconfigccl/spanconfiglimiterccl:spanconfiglimiterccl_test",
"//pkg/ccl/spanconfigccl/spanconfigreconcilerccl:spanconfigreconcilerccl_test",
"//pkg/ccl/spanconfigccl/spanconfigsplitterccl:spanconfigsplitterccl_test",
"//pkg/ccl/spanconfigccl/spanconfigsqltranslatorccl:spanconfigsqltranslatorccl_test",
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/backupccl/system_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,9 @@ var systemTableBackupConfiguration = map[string]systemBackupConfiguration{
shouldIncludeInClusterBackup: optInToClusterBackup,
customRestoreFunc: tenantSettingsTableRestoreFunc,
},
systemschema.SpanCountTable.GetName(): {
shouldIncludeInClusterBackup: optOutOfClusterBackup,
},
}

// GetSystemTablesToIncludeInClusterBackup returns a set of system table names that
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/migrationccl/migrationsccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_test(
name = "migrationsccl_test",
srcs = [
"main_test.go",
"seed_span_counts_external_test.go",
"seed_tenant_span_configs_external_test.go",
],
deps = [
Expand All @@ -15,7 +16,9 @@ go_test(
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
Expand Down
252 changes: 252 additions & 0 deletions pkg/ccl/migrationccl/migrationsccl/seed_span_counts_external_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
// Copyright 2022 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package migrationsccl_test

import (
"context"
gosql "database/sql"
"net/url"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

// TestPreSeedSpanCountTable tests that incremental schema changes after
// PreSeedSpanCountTable is enabled get tracked as such. It also tests that once
// SeedSpanCountTable is reached, the span count is updated to capture the most
// up-to-date view of all schema objects. Specifically, we're not
// double-counting the incremental update we tracked in the
// PreSeedSpanCountTable state.
func TestPreSeedSpanCountTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var (
v0 = clusterversion.ByKey(clusterversion.SpanCountTable)
v1 = clusterversion.ByKey(clusterversion.PreSeedSpanCountTable)
v2 = clusterversion.ByKey(clusterversion.SeedSpanCountTable)
)

ctx := context.Background()
settings := cluster.MakeTestingClusterSettingsWithVersions(v2, v0, false /* initializeVersion */)
require.NoError(t, clusterversion.Initialize(ctx, v0, &settings.SV))

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: make(chan struct{}),
BinaryVersionOverride: v0,
},
},
},
})

defer tc.Stopper().Stop(ctx)
ts := tc.Server(0)
hostDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))

tenantID := roachpb.MakeTenantID(10)
tenantSettings := cluster.MakeTestingClusterSettingsWithVersions(v2, v0, false /* initializeVersion */)
require.NoError(t, clusterversion.Initialize(ctx, v0, &tenantSettings.SV))
tenant, err := ts.StartTenant(ctx, base.TestTenantArgs{
TenantID: tenantID,
TestingKnobs: base.TestingKnobs{},
Settings: tenantSettings,
})
require.NoError(t, err)

pgURL, cleanupPGUrl := sqlutils.PGUrl(t, tenant.SQLAddr(), "Tenant", url.User(security.RootUser))
defer cleanupPGUrl()

tenantSQLDB, err := gosql.Open("postgres", pgURL.String())
require.NoError(t, err)
defer func() { require.NoError(t, tenantSQLDB.Close()) }()

// Upgrade the host cluster all the way.
hostDB.Exec(t, "SET CLUSTER SETTING version = $1", v2.String())

var spanCount, numRows int
tenantDB := sqlutils.MakeSQLRunner(tenantSQLDB)

tenantDB.CheckQueryResults(t, "SHOW CLUSTER SETTING version", [][]string{{v0.String()}})
tenantDB.Exec(t, `CREATE TABLE t(k INT PRIMARY KEY)`)
tenantDB.QueryRow(t, `SELECT count(*) FROM system.span_count`).Scan(&numRows)
require.Equal(t, 0, numRows)

tenantDB.Exec(t, "SET CLUSTER SETTING version = $1", v1.String())
tenantDB.CheckQueryResults(t, "SHOW CLUSTER SETTING version", [][]string{{v1.String()}})
tenantDB.Exec(t, `CREATE INDEX idx ON t (k)`)
tenantDB.QueryRow(t, `SELECT span_count FROM system.span_count LIMIT 1`).Scan(&spanCount)
require.Equal(t, 2, spanCount)

tenantDB.Exec(t, "SET CLUSTER SETTING version = $1", v2.String())
tenantDB.CheckQueryResults(t, "SHOW CLUSTER SETTING version", [][]string{{v2.String()}})
tenantDB.QueryRow(t, `SELECT span_count FROM system.span_count LIMIT 1`).Scan(&spanCount)
require.Equal(t, 5, spanCount)
}

// TestSeedSpanCountTable tests that the migration seeds system.span_count
// correctly for secondary tenants.
func TestSeedSpanCountTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var (
v0 = clusterversion.ByKey(clusterversion.SpanCountTable)
v2 = clusterversion.ByKey(clusterversion.SeedSpanCountTable)
)

ctx := context.Background()
settings := cluster.MakeTestingClusterSettingsWithVersions(v2, v0, false /* initializeVersion */)
require.NoError(t, clusterversion.Initialize(ctx, v0, &settings.SV))

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: make(chan struct{}),
BinaryVersionOverride: v0,
},
},
},
})

defer tc.Stopper().Stop(ctx)
ts := tc.Server(0)
hostDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))

tenantID := roachpb.MakeTenantID(10)
tenantSettings := cluster.MakeTestingClusterSettingsWithVersions(v2, v0, false /* initializeVersion */)
require.NoError(t, clusterversion.Initialize(ctx, v0, &tenantSettings.SV))
tenant, err := ts.StartTenant(ctx, base.TestTenantArgs{
TenantID: tenantID,
TestingKnobs: base.TestingKnobs{},
Settings: tenantSettings,
})
require.NoError(t, err)

pgURL, cleanupPGUrl := sqlutils.PGUrl(t, tenant.SQLAddr(), "Tenant", url.User(security.RootUser))
defer cleanupPGUrl()

tenantSQLDB, err := gosql.Open("postgres", pgURL.String())
require.NoError(t, err)
defer func() { require.NoError(t, tenantSQLDB.Close()) }()

tenantDB := sqlutils.MakeSQLRunner(tenantSQLDB)
tenantDB.CheckQueryResults(t, "SHOW CLUSTER SETTING version", [][]string{{v0.String()}})

// Upgrade the host cluster.
hostDB.Exec(t, "SET CLUSTER SETTING version = $1", v2.String())
tenantDB.CheckQueryResults(t, "SHOW CLUSTER SETTING version", [][]string{{v0.String()}})

tenantDB.Exec(t, `CREATE TABLE t(k INT PRIMARY KEY)`)

var spanCount, numRows int
tenantDB.QueryRow(t, `SELECT count(*) FROM system.span_count`).Scan(&numRows)
require.Equal(t, 0, numRows)

tenantDB.Exec(t, "SET CLUSTER SETTING version = $1", v2.String())
tenantDB.QueryRow(t, `SELECT span_count FROM system.span_count LIMIT 1`).Scan(&spanCount)
require.Equal(t, 3, spanCount)
}

// TestSeedSpanCountTableOverLimit tests that the migration seeds
// system.span_count correctly for secondary tenants, even if over the
// proscribed limit. In these cases the tenant goes into debt -- all subsequent
// schema changes that add schema elements will be rejected. Attempts to free up
// spans however will be accepted.
func TestSeedSpanCountTableOverLimit(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

var (
v0 = clusterversion.ByKey(clusterversion.SpanCountTable)
v2 = clusterversion.ByKey(clusterversion.SeedSpanCountTable)
)

ctx := context.Background()
settings := cluster.MakeTestingClusterSettingsWithVersions(v2, v0, false /* initializeVersion */)
require.NoError(t, clusterversion.Initialize(ctx, v0, &settings.SV))

tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Settings: settings,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: make(chan struct{}),
BinaryVersionOverride: v0,
},
},
},
})

defer tc.Stopper().Stop(ctx)
ts := tc.Server(0)
hostDB := sqlutils.MakeSQLRunner(tc.ServerConn(0))

const limit = 1
tenantID := roachpb.MakeTenantID(10)
tenantSettings := cluster.MakeTestingClusterSettingsWithVersions(v2, v0, false /* initializeVersion */)
require.NoError(t, clusterversion.Initialize(ctx, v0, &tenantSettings.SV))
tenant, err := ts.StartTenant(ctx, base.TestTenantArgs{
TenantID: tenantID,
TestingKnobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
ExcludeDroppedDescriptorsFromLookup: true,
LimiterLimitOverride: func() int64 {
return limit
},
},
},
Settings: tenantSettings,
})
require.NoError(t, err)

pgURL, cleanupPGUrl := sqlutils.PGUrl(t, tenant.SQLAddr(), "Tenant", url.User(security.RootUser))
defer cleanupPGUrl()

tenantSQLDB, err := gosql.Open("postgres", pgURL.String())
require.NoError(t, err)
defer func() { require.NoError(t, tenantSQLDB.Close()) }()

// Upgrade the host cluster.
hostDB.Exec(t, "SET CLUSTER SETTING version = $1", v2.String())

tenantDB := sqlutils.MakeSQLRunner(tenantSQLDB)
tenantDB.Exec(t, `CREATE TABLE t1(k INT PRIMARY KEY)`)
tenantDB.Exec(t, `CREATE TABLE t2(k INT PRIMARY KEY)`)
tenantDB.Exec(t, `CREATE TABLE t3(k INT PRIMARY KEY)`)

var spanCount int
tenantDB.Exec(t, "SET CLUSTER SETTING version = $1", v2.String())
tenantDB.QueryRow(t, `SELECT span_count FROM system.span_count LIMIT 1`).Scan(&spanCount)
require.Equal(t, 9, spanCount)

_, err = tenantDB.DB.ExecContext(ctx, `CREATE TABLE t4(k INT PRIMARY KEY)`)
require.True(t, testutils.IsError(err, "exceeded limit for number of table spans"))

tenantDB.Exec(t, `DROP TABLE t3`)
tenantDB.QueryRow(t, `SELECT span_count FROM system.span_count LIMIT 1`).Scan(&spanCount)
require.Equal(t, 6, spanCount)
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ diff offset=48
+/Tenant/11/Table/43 database system (tenant)
+/Tenant/11/Table/44 database system (tenant)
+/Tenant/11/Table/46 database system (tenant)
+/Tenant/11/Table/50 database system (tenant)
+/Tenant/11/Table/106 ttl_seconds=1000 num_replicas=42
+/Tenant/11/Table/107 range default

Expand Down
37 changes: 37 additions & 0 deletions pkg/ccl/spanconfigccl/spanconfiglimiterccl/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
load("@io_bazel_rules_go//go:def.bzl", "go_test")

go_test(
name = "spanconfiglimiterccl_test",
srcs = [
"datadriven_test.go",
"drop_table_test.go",
"main_test.go",
],
data = glob(["testdata/**"]),
deps = [
"//pkg/base",
"//pkg/ccl/kvccl/kvtenantccl",
"//pkg/ccl/partitionccl",
"//pkg/ccl/utilccl",
"//pkg/config",
"//pkg/config/zonepb",
"//pkg/keys",
"//pkg/roachpb",
"//pkg/security",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/spanconfig",
"//pkg/spanconfig/spanconfigtestutils/spanconfigtestcluster",
"//pkg/sql/gcjob",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
Loading

0 comments on commit 4823f8c

Please sign in to comment.