Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
94774: sql: fix mixed-version behaviour of create_tenant() r=postamar a=postamar

Previously, running create_tenant on the latest binary in
a mixed-version cluster would bootstrap the system schema using the
logic from the latest binary, in which all system database migration
upgrade steps have been "baked in".

This might not be compatible with the the tenant cluster's version which
is (correctly) set to the active host cluster version, the idea there
being that tenant clusters versions cannot be greater than host cluster
versions.

This commit fixes this by version-gating the tenant cluster system
schema bootstrapping, and using hard-coded values when bootstrapping
into the old version.

Informs #94773.

Release note: None

95355: server: always enable sql_instances maintenance r=knz a=dt

Fixes #95571.

Previously the system.sql_instances table was only maintained by SQL servers that were operating in "pod" mode, i.e. not in mixed KV and SQL process nodes, where KV-level liveness and gossip provides an alternative means of node discovery that can be used by the SQL layer when searching for other SQL instances. However this inconsistency makes writing correct remote-node discovery and interaction SQL-level code difficult: in some cases such code needs to consult the instances list, and in some the KV liveness store, which when combined with complexities of doing so around initialization, dependency-injection, etc can become hard to maintain.

Additionally such a design precludes a cluster where some SQL instances are in mixed KV nodes and some are not, as the non-KV nodes would have no way discover the KV ones. Such deployments are not currently possible but could be in the future.

Instead, this change enabled maintenance of the sql_instances table by all SQL servers, whether running in their own processes or embedded in a KV storage node process. This paves the way for making the means of discovery of SQL servers uniform across all SQL server types: they will all be able to simply consult the instances list, to find any other SQL servers, regardless of where those SQL servers are running.

A follow-up change could simplify DistSQLPhysicalPlanner, specifically the SetupAllNodesPlanning method that has two different implementations due to the previous inconsistency in the available APIs.

Release note: none.
Epic: CRDB-14537


95528: backupccl: fix flaky TestExcludeDataFromBackupAndRestore r=msbutler a=adityamaru

We don't need to wait for the table to split, inspecting the state of the leaseholders replica is adequate and a more correct source of truth to rely on.

In some cases the test would not wait for `data.bar` to split into its own range and so it would incorrectly be excluded from the backup resulting in 0 rows instead of 10 in the final assertion.

Fixes: #95350

Release note: None

Co-authored-by: Marius Posta <[email protected]>
Co-authored-by: David Taylor <[email protected]>
Co-authored-by: adityamaru <[email protected]>
  • Loading branch information
4 people committed Jan 21, 2023
4 parents 1b79102 + 08f2dc4 + d916cc6 + 8eaa89f commit 1982295
Show file tree
Hide file tree
Showing 16 changed files with 901 additions and 84 deletions.
2 changes: 2 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ ALL_TESTS = [
"//pkg/spanconfig:spanconfig_test",
"//pkg/sql/backfill:backfill_test",
"//pkg/sql/cacheutil:cacheutil_test",
"//pkg/sql/catalog/bootstrap:bootstrap_test",
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
"//pkg/sql/catalog/catenumpb:catenumpb_disallowed_imports_test",
"//pkg/sql/catalog/catformat:catformat_test",
Expand Down Expand Up @@ -1416,6 +1417,7 @@ GO_TARGETS = [
"//pkg/sql/cacheutil:cacheutil",
"//pkg/sql/cacheutil:cacheutil_test",
"//pkg/sql/catalog/bootstrap:bootstrap",
"//pkg/sql/catalog/bootstrap:bootstrap_test",
"//pkg/sql/catalog/catalogkeys:catalogkeys",
"//pkg/sql/catalog/catalogkeys:catalogkeys_test",
"//pkg/sql/catalog/catenumpb:catenumpb",
Expand Down
14 changes: 9 additions & 5 deletions pkg/ccl/backupccl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9251,17 +9251,21 @@ func TestExcludeDataFromBackupAndRestore(t *testing.T) {
// Set foo to exclude_data_from_backup and back it up. The ExportRequest
// should be a noop and backup no data.
sqlDB.Exec(t, `ALTER TABLE data.foo SET (exclude_data_from_backup = true)`)
waitForTableSplit(t, conn, "foo", "data")
waitForReplicaFieldToBeSet(t, tc, conn, "foo", "data", func(r *kvserver.Replica) (bool, error) {
if !r.ExcludeDataFromBackup() {
return false, errors.New("waiting for exclude_data_from_backup to be applied")
return false, errors.New("waiting for the range containing table data.foo to split")
}
return true, nil
})
waitForTableSplit(t, conn, "bar", "data")
sqlDB.Exec(t, `BACKUP DATABASE data TO $1`, localFoo)
waitForReplicaFieldToBeSet(t, tc, conn, "bar", "data", func(r *kvserver.Replica) (bool, error) {
if r.ExcludeDataFromBackup() {
return false, errors.New("waiting for the range containing table data.bar to split")
}
return true, nil
})
sqlDB.Exec(t, `BACKUP DATABASE data INTO $1`, localFoo)

restoreDB.Exec(t, `RESTORE DATABASE data FROM $1`, localFoo)
restoreDB.Exec(t, `RESTORE DATABASE data FROM LATEST IN $1`, localFoo)
require.Len(t, restoreDB.QueryStr(t, `SELECT * FROM data.foo`), 0)
require.Len(t, restoreDB.QueryStr(t, `SELECT * FROM data.bar`), 10)
}
Expand Down
13 changes: 7 additions & 6 deletions pkg/ccl/kvccl/kvtenantccl/tenant_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func TestTenantUpgrade(t *testing.T) {
cleanupPGUrl()
}
}
expectedInitialTenantVersion, _, _ := v0v1v2()
mkTenant := func(t *testing.T, id uint64) (tenantDB *gosql.DB, cleanup func()) {
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
Expand All @@ -90,7 +91,7 @@ func TestTenantUpgrade(t *testing.T) {
)
// Initialize the version to the minimum it could be.
require.NoError(t, clusterversion.Initialize(ctx,
clusterversion.TestingBinaryMinSupportedVersion, &settings.SV))
expectedInitialTenantVersion, &settings.SV))
tenantArgs := base.TestTenantArgs{
TenantID: roachpb.MustMakeTenantID(id),
TestingKnobs: base.TestingKnobs{},
Expand All @@ -109,7 +110,7 @@ func TestTenantUpgrade(t *testing.T) {

// Ensure that the tenant works.
initialTenantRunner.CheckQueryResults(t, "SHOW CLUSTER SETTING version",
[][]string{{clusterversion.TestingBinaryMinSupportedVersion.String()}})
[][]string{{expectedInitialTenantVersion.String()}})
initialTenantRunner.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY)")
initialTenantRunner.Exec(t, "INSERT INTO t VALUES (1), (2)")

Expand Down Expand Up @@ -172,11 +173,11 @@ func TestTenantUpgrade(t *testing.T) {

}

// Returns two versions v0, v1, v2 which correspond to adjacent releases. v0 will
// equal the TestingBinaryMinSupportedVersion to avoid rot in tests using this
// (as we retire old versions).
// Returns three versions :
// - v0 corresponds to the bootstrapped version of the tenant,
// - v1, v2 correspond to adjacent releases.
func v0v1v2() (roachpb.Version, roachpb.Version, roachpb.Version) {
v0 := clusterversion.TestingBinaryMinSupportedVersion
v0 := clusterversion.ByKey(clusterversion.V22_2)
v1 := clusterversion.TestingBinaryVersion
v2 := clusterversion.TestingBinaryVersion
if v1.Internal > 2 {
Expand Down
26 changes: 18 additions & 8 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,18 +539,28 @@ func (r *Registry) CreateJobWithTxn(
}
return p.String()
}
// TODO(jayant): remove this version gate in 24.1
// To run the upgrade below, migration and schema change jobs will need
// to be created using the old schema of the jobs table.
if !r.settings.Version.IsActive(ctx, clusterversion.V23_1AddTypeColumnToJobsTable) {
numCols -= 1
}

// We need to override the database in case we're in a situation where the
// database in question is being dropped.
override := sessiondata.RootUserSessionDataOverride
override.Database = catconstants.SystemDatabaseName
insertStmt := fmt.Sprintf(`INSERT INTO system.jobs (%s) VALUES (%s)`, strings.Join(cols[:numCols], ","), placeholders())
hasJobTypeColumn := r.settings.Version.IsActive(ctx, clusterversion.V23_1AddTypeColumnToJobsTable)
if hasJobTypeColumn {
// Relying on the version gate may not be sufficient.
const pgAttributeStmt = `
SELECT * FROM system.pg_catalog.pg_attribute
WHERE attrelid = 'system.public.jobs'::REGCLASS
AND attname = 'job_type'`
row, err := txn.QueryRowEx(ctx, "job-columns-get", txn.KV(), override, pgAttributeStmt)
if err != nil {
return err
}
hasJobTypeColumn = row != nil
}
if !hasJobTypeColumn {
numCols -= 1
}
insertStmt := fmt.Sprintf(`INSERT INTO system.jobs (%s) VALUES (%s)`,
strings.Join(cols[:numCols], ","), placeholders())
_, err = txn.ExecEx(
ctx, "job-row-insert", txn.KV(),
override,
Expand Down
113 changes: 74 additions & 39 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sessioninit"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
Expand Down Expand Up @@ -443,6 +444,33 @@ func (s *stopperSessionEventListener) OnSessionDeleted(
return false
}

type refreshInstanceSessionListener struct {
cfg *sqlServerArgs
}

var _ slinstance.SessionEventListener = &stopperSessionEventListener{}

// OnSessionDeleted implements the slinstance.SessionEventListener interface.
func (r *refreshInstanceSessionListener) OnSessionDeleted(
ctx context.Context,
) (createAnotherSession bool) {
if err := r.cfg.stopper.RunAsyncTask(ctx, "refresh-instance-session", func(context.Context) {
nodeID, _ := r.cfg.nodeIDContainer.OptionalNodeID()
s, err := r.cfg.sqlLivenessProvider.Session(ctx)
if err != nil {
log.Errorf(ctx, "faild to get new liveness session ID: %v", err)
}
if _, err := r.cfg.sqlInstanceStorage.CreateNodeInstance(
ctx, s.ID(), s.Expiration(), r.cfg.AdvertiseAddr, r.cfg.SQLAdvertiseAddr, r.cfg.Locality, nodeID,
); err != nil {
log.Errorf(ctx, "failed to update instance with new session ID: %v", err)
}
}); err != nil {
log.Errorf(ctx, "failed to run update of instance with new session ID: %v", err)
}
return true
}

// newSQLServer constructs a new SQLServer. The caller is responsible for
// listening to the server's ShutdownRequested() channel (which is the same as
// cfg.stopTrigger.C()) and stopping cfg.stopper when signaled.
Expand Down Expand Up @@ -483,37 +511,36 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// instance provider without initializing the instance, since this is not a
// SQL pod server.
_, isMixedSQLAndKVNode := cfg.nodeIDContainer.OptionalNodeID()
isSQLPod := !isMixedSQLAndKVNode

sqllivenessKnobs, _ := cfg.TestingKnobs.SQLLivenessKnobs.(*sqlliveness.TestingKnobs)
var sessionEventsConsumer slinstance.SessionEventListener
if isSQLPod {
if !isMixedSQLAndKVNode {
// For SQL pods, we want the process to shutdown when the session liveness
// record is found to be deleted. This is because, if the session is
// deleted, the instance ID used by this server may have been stolen by
// another server, or it may be stolen in the future. This server shouldn't
// use the instance ID anymore, and there's no mechanism for allocating a
// new one after startup.
sessionEventsConsumer = &stopperSessionEventListener{trigger: cfg.stopTrigger}
} else {
sessionEventsConsumer = &refreshInstanceSessionListener{cfg: &cfg}
}
cfg.sqlLivenessProvider = slprovider.New(
cfg.AmbientCtx,
cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs, sessionEventsConsumer,
)

if isSQLPod {
if codec.ForSystemTenant() {
return nil, errors.AssertionFailedf("non-system codec used for SQL pod")
}

cfg.sqlInstanceStorage = instancestorage.NewStorage(
cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings)
cfg.sqlInstanceReader = instancestorage.NewReader(
cfg.sqlInstanceStorage,
cfg.sqlLivenessProvider,
cfg.rangeFeedFactory,
codec, cfg.clock, cfg.stopper)
cfg.sqlInstanceStorage = instancestorage.NewStorage(
cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings)
cfg.sqlInstanceReader = instancestorage.NewReader(
cfg.sqlInstanceStorage,
cfg.sqlLivenessProvider,
cfg.rangeFeedFactory,
codec, cfg.clock, cfg.stopper)

if isMixedSQLAndKVNode {
cfg.podNodeDialer = cfg.nodeDialer
} else {
// In a multi-tenant environment, use the sqlInstanceReader to resolve
// SQL pod addresses.
addressResolver := func(nodeID roachpb.NodeID) (net.Addr, error) {
Expand All @@ -524,11 +551,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
return &util.UnresolvedAddr{AddressField: info.InstanceRPCAddr}, nil
}
cfg.podNodeDialer = nodedialer.New(cfg.rpcContext, addressResolver)
} else {
if !codec.ForSystemTenant() {
return nil, errors.AssertionFailedf("system codec used for SQL-only node")
}
cfg.podNodeDialer = cfg.nodeDialer
}

jobRegistry := cfg.circularJobRegistry
Expand Down Expand Up @@ -818,8 +840,10 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
// cluster.
var getNodes func(ctx context.Context) ([]roachpb.NodeID, error)
var nodeDialer *nodedialer.Dialer
if !isSQLPod {
if isMixedSQLAndKVNode {
nodeDialer = cfg.nodeDialer
// TODO(dt): any reason not to just always use the instance reader? And just
// pass it directly instead of making a new closure here?
getNodes = func(ctx context.Context) ([]roachpb.NodeID, error) {
var ns []roachpb.NodeID
ls, err := nodeLiveness.GetLivenessesFromKV(ctx)
Expand Down Expand Up @@ -1339,26 +1363,37 @@ func (s *SQLServer) preStart(
// Start the sql liveness subsystem. We'll need it to get a session.
s.sqlLivenessProvider.Start(ctx, regionPhysicalRep)

_, isMixedSQLAndKVNode := s.sqlIDContainer.OptionalNodeID()
isTenant := !isMixedSQLAndKVNode
session, err := s.sqlLivenessProvider.Session(ctx)
if err != nil {
return err
}
// Start instance ID reclaim loop.
if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop(
ctx, stopper, timeutil.DefaultTimeSource{}, s.internalDB, session.Expiration,
); err != nil {
return err
}
nodeId, isMixedSQLAndKVNode := s.sqlIDContainer.OptionalNodeID()

if isTenant {
session, err := s.sqlLivenessProvider.Session(ctx)
var instance sqlinstance.InstanceInfo
if isMixedSQLAndKVNode {
// Write/acquire our instance row.
instance, err = s.sqlInstanceStorage.CreateNodeInstance(
ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality, nodeId,
)
if err != nil {
return err
}
// Start instance ID reclaim loop.
if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop(
ctx, stopper, timeutil.DefaultTimeSource{}, s.internalDB, session.Expiration,
); err != nil {
return err
}
// Acquire our instance row.
instance, err := s.sqlInstanceStorage.CreateInstance(
ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality)
} else {
instance, err = s.sqlInstanceStorage.CreateInstance(
ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.cfg.SQLAdvertiseAddr, s.distSQLServer.Locality,
)
if err != nil {
return err
}
}

if !isMixedSQLAndKVNode {
// TODO(andrei): Release the instance ID on server shutdown. It is not trivial
// to determine where/when exactly to do that, though. Doing it after stopper
// quiescing doesn't work. Doing it too soon, for example as part of draining,
Expand All @@ -1372,13 +1407,13 @@ func (s *SQLServer) preStart(
if err := s.setInstanceID(ctx, instance.InstanceID, session.ID()); err != nil {
return err
}
// Start the instance provider. This needs to come after we've allocated our
// instance ID because the instances reader needs to see our own instance;
// we might be the only SQL server available, especially when we have not
// received data from the rangefeed yet, and if the reader doesn't see
// it, we'd be unable to plan any queries.
s.sqlInstanceReader.Start(ctx, instance)
}
// Start the instance provider. This needs to come after we've allocated our
// instance ID because the instances reader needs to see our own instance;
// we might be the only SQL server available, especially when we have not
// received data from the rangefeed yet, and if the reader doesn't see
// it, we'd be unable to plan any queries.
s.sqlInstanceReader.Start(ctx, instance)

s.execCfg.GCJobNotifier.Start(ctx)
s.temporaryObjectCleaner.Start(ctx, stopper)
Expand Down
20 changes: 19 additions & 1 deletion pkg/sql/catalog/bootstrap/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data")
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "bootstrap",
srcs = [
"kv_writer.go",
"metadata.go",
"previous_release.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/catalog/bootstrap",
visibility = ["//visibility:public"],
Expand All @@ -31,4 +32,21 @@ go_library(
],
)

go_test(
name = "bootstrap_test",
srcs = ["bootstrap_test.go"],
args = ["-test.timeout=295s"],
data = glob(["testdata/**"]),
embed = [":bootstrap"],
deps = [
"//pkg/config/zonepb",
"//pkg/keys",
"//pkg/roachpb",
"//pkg/testutils/datapathutils",
"//pkg/util/leaktest",
"@com_github_cockroachdb_datadriven//:datadriven",
"@com_github_stretchr_testify//require",
],
)

get_x_data(name = "get_x_data")
Loading

0 comments on commit 1982295

Please sign in to comment.