Skip to content

Commit

Permalink
tenantsettingswatcher: remove the version gate
Browse files Browse the repository at this point in the history
This commit removes the version gate for the tenant settings.

Release note: None
  • Loading branch information
yuzefovich committed Jul 6, 2022
1 parent f6d0d84 commit ad06d0c
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 247 deletions.
6 changes: 0 additions & 6 deletions pkg/clusterversion/cockroach_versions.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,6 @@ const (
// on lease transfer Raft proposals. New leaseholders now forward their clock
// directly to the new lease start time.
DontProposeWriteTimestampForLeaseTransfers
// TenantSettingsTable adds the system table for tracking tenant usage.
TenantSettingsTable
// EnablePebbleFormatVersionBlockProperties enables a new Pebble SSTable
// format version for block property collectors.
// NB: this cluster version is paired with PebbleFormatBlockPropertyCollector
Expand Down Expand Up @@ -540,10 +538,6 @@ var versionsSingleton = keyedVersions{
Key: DontProposeWriteTimestampForLeaseTransfers,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 60},
},
{
Key: TenantSettingsTable,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 62},
},
{
Key: EnablePebbleFormatVersionBlockProperties,
Version: roachpb.Version{Major: 21, Minor: 2, Internal: 64},
Expand Down
77 changes: 38 additions & 39 deletions pkg/clusterversion/key_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 0 additions & 4 deletions pkg/server/tenantsettingswatcher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/server/tenantsettingswatcher",
visibility = ["//visibility:public"],
deps = [
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/kv/kvclient/rangefeed",
"//pkg/kv/kvclient/rangefeed/rangefeedbuffer",
Expand Down Expand Up @@ -44,7 +43,6 @@ go_test(
embed = [":tenantsettingswatcher"],
deps = [
"//pkg/base",
"//pkg/clusterversion",
"//pkg/keys",
"//pkg/roachpb",
"//pkg/security/securityassets",
Expand All @@ -53,12 +51,10 @@ go_test(
"//pkg/settings",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/leaktest",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
)
67 changes: 6 additions & 61 deletions pkg/server/tenantsettingswatcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ package tenantsettingswatcher

import (
"context"
"sync"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed/rangefeedbuffer"
Expand Down Expand Up @@ -78,56 +76,14 @@ func New(

// Start will start the Watcher.
//
// If the current cluster version indicates that we have a tenant settings
// table, this function sets up the rangefeed and waits for the initial scan. An
// error will be returned if the initial table scan hits an error, the context
// is canceled or the stopper is stopped prior to the initial data being
// retrieved.
//
// Otherwise, Start sets up a background task that waits for the right version
// and starts the rangefeed when appropriate. WaitUntilStarted can be used to
// wait for the rangefeed setup.
// This function sets up the rangefeed and waits for the initial scan. An error
// will be returned if the initial table scan hits an error, the context is
// canceled or the stopper is stopped prior to the initial data being retrieved.
func (w *Watcher) Start(ctx context.Context, sysTableResolver catalog.SystemTableIDResolver) error {
w.startCh = make(chan struct{})
if w.st.Version.IsActive(ctx, clusterversion.TenantSettingsTable) {
// We are not in a mixed-version scenario; start the rangefeed now.
w.startErr = w.startRangeFeed(ctx, sysTableResolver)
close(w.startCh)
return w.startErr
}
// Set up an on-change callback that closes this channel once the version
// supports tenant settings.
versionOkCh := make(chan struct{})
var once sync.Once
w.st.Version.SetOnChange(func(ctx context.Context, newVersion clusterversion.ClusterVersion) {
if newVersion.IsActive(clusterversion.TenantSettingsTable) {
once.Do(func() {
close(versionOkCh)
})
}
})
// Now check the version again, in case the version changed just before
// SetOnChange.
if w.st.Version.IsActive(ctx, clusterversion.TenantSettingsTable) {
w.startErr = w.startRangeFeed(ctx, sysTableResolver)
close(w.startCh)
return w.startErr
}
return w.stopper.RunAsyncTask(ctx, "tenantsettingswatcher-start", func(ctx context.Context) {
log.Infof(ctx, "tenantsettingswatcher waiting for the appropriate version")
select {
case <-versionOkCh:
case <-w.stopper.ShouldQuiesce():
return
}
log.Infof(ctx, "tenantsettingswatcher can now start")
w.startErr = w.startRangeFeed(ctx, sysTableResolver)
if w.startErr != nil {
// We are not equipped to handle this error asynchronously.
log.Warningf(ctx, "error starting tenantsettingswatcher rangefeed: %v", w.startErr)
}
close(w.startCh)
})
w.startErr = w.startRangeFeed(ctx, sysTableResolver)
close(w.startCh)
return w.startErr
}

// startRangeFeed starts the range feed and waits for the initial table scan. An
Expand Down Expand Up @@ -237,11 +193,6 @@ func (w *Watcher) startRangeFeed(

// WaitForStart waits until the rangefeed is set up. Returns an error if the
// rangefeed setup failed.
//
// If the cluster version does not support tenant settings, returns immediately
// with no error. Note that it is still legal to call GetTenantOverrides and
// GetAllTenantOverrides in this state. When the cluster version is upgraded,
// the settings will start being updated.
func (w *Watcher) WaitForStart(ctx context.Context) error {
// Fast path check.
select {
Expand All @@ -252,12 +203,6 @@ func (w *Watcher) WaitForStart(ctx context.Context) error {
if w.startCh == nil {
return errors.AssertionFailedf("Start() was not yet called")
}
if !w.st.Version.IsActive(ctx, clusterversion.TenantSettingsTable) {
// If this happens, then we are running new tenant code against a host
// cluster that was not fully upgraded.
log.Warningf(ctx, "tenant requested settings before host cluster version upgrade")
return nil
}
select {
case <-w.startCh:
return w.startErr
Expand Down
89 changes: 0 additions & 89 deletions pkg/server/tenantsettingswatcher/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,13 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/server/tenantsettingswatcher"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -117,87 +112,3 @@ func TestWatcher(t *testing.T) {
t3Overrides, _ = w.GetTenantOverrides(t3)
expect(t3Overrides, "qux=qux-t3")
}

// TestWatcherWaitForVersion verifies that watcher startup waits for the cluster
// version to be upgraded.
func TestWatcherWaitForVersion(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

var oldVersion = clusterversion.ByKey(clusterversion.V21_2)

disableUpgradeCh := make(chan struct{})
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
BinaryVersionOverride: oldVersion,
DisableAutomaticVersionUpgrade: disableUpgradeCh,
},
},
},
})
defer tc.Stopper().Stop(ctx)

s0 := tc.Server(0)
w := tenantsettingswatcher.New(
s0.Clock(),
s0.ExecutorConfig().(sql.ExecutorConfig).RangeFeedFactory,
s0.Stopper(),
s0.ClusterSettings(),
)

// Start should go in async mode and wait for the version.
err := w.Start(ctx, s0.SystemTableIDResolver().(catalog.SystemTableIDResolver))
require.NoError(t, err)

// Allow upgrade, wait for the table to be created.
close(disableUpgradeCh)
db := tc.ServerConn(0)
testutils.SucceedsSoon(t, func() error {
row := db.QueryRow("SELECT count(*) FROM [SHOW TABLES FROM system] WHERE table_name = 'tenant_settings'")
if row.Err() != nil {
return row.Err()
}
var count int
if err := row.Scan(&count); err != nil {
return err
}
if count == 0 {
return errors.Errorf("tenant_settings table does not exist")
}
return nil
})
// Wait for watcher start.
waitForStartCh := make(chan error)
go func() {
waitForStartCh <- w.WaitForStart(ctx)
}()
select {
case err := <-waitForStartCh:
if err != nil {
t.Fatalf("WaitForStart error: %v", err)
}
case <-time.After(45 * time.Second):
t.Fatalf("WaitForStart did not return after upgrade was allowed")
}

// Set an override and make sure the watcher is working.
_, ch := w.GetAllTenantOverrides()
r := sqlutils.MakeSQLRunner(db)
r.Exec(t, `INSERT INTO system.tenant_settings (tenant_id, name, value, value_type) VALUES (0, 'foo', 'foo', 's')`)
// Wait for the update.
select {
case <-ch:
overrides, _ := w.GetAllTenantOverrides()
expected := roachpb.TenantSetting{
Name: "foo",
Value: settings.EncodedValue{Value: "foo", Type: "s"},
}
if len(overrides) != 1 || overrides[0] != expected {
t.Fatalf("invalid overrides %v", overrides)
}
case <-time.After(45 * time.Second):
t.Fatalf("Did not receive updated tenant overrides")
}
}
12 changes: 5 additions & 7 deletions pkg/sql/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,13 +448,11 @@ func GCTenantSync(ctx context.Context, execCfg *ExecutorConfig, info *descpb.Ten
return errors.Wrapf(err, "deleting tenant %d usage", info.ID)
}

if execCfg.Settings.Version.IsActive(ctx, clusterversion.TenantSettingsTable) {
if _, err := execCfg.InternalExecutor.ExecEx(
ctx, "delete-tenant-settings", txn, sessiondata.NodeUserSessionDataOverride,
`DELETE FROM system.tenant_settings WHERE tenant_id = $1`, info.ID,
); err != nil {
return errors.Wrapf(err, "deleting tenant %d settings", info.ID)
}
if _, err := execCfg.InternalExecutor.ExecEx(
ctx, "delete-tenant-settings", txn, sessiondata.NodeUserSessionDataOverride,
`DELETE FROM system.tenant_settings WHERE tenant_id = $1`, info.ID,
); err != nil {
return errors.Wrapf(err, "deleting tenant %d settings", info.ID)
}

if !execCfg.Settings.Version.IsActive(ctx, clusterversion.PreSeedTenantSpanConfigs) {
Expand Down
1 change: 0 additions & 1 deletion pkg/upgrade/upgrades/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ go_library(
"seed_tenant_span_configs.go",
"span_count_table.go",
"system_privileges.go",
"tenant_settings.go",
"upgrade_sequence_to_be_referenced_by_ID.go",
"upgrades.go",
],
Expand Down
Loading

0 comments on commit ad06d0c

Please sign in to comment.