From 33d0eed17858be5cce91265081e6a27609bed580 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Wed, 27 Sep 2023 12:01:59 +0100 Subject: [PATCH] sql: PartitionSpan should only use healthy nodes in mixed-process mode Previously, when running in mixed-process mode, the DistSQLPlanner's PartitionSpans method would assume that it could directly assign a given span to the SQLInstanceID that matches the NodeID of whatever replica the current replica oracle returned, without regard to whether the SQL instance was available. This is different from the system tenant code paths which proactively check node health and the non-mixed-process MT code paths which would use an eventually consistent view of healthy nodes. As a result, processes that use PartitionSpans such as BACKUP may fail when a node was down. Here, we have the mixed-process case work more like the separate process case in which we only use nodes returned by the instance reader. This list should eventually exclude any down nodes. An alternative (or perhaps an addition) would be to allow MT planning to do direct status checks more similar to how they are done for the system tenant. Finally, this also adds another error to our list of non-permanent errors. Namely, if we fail to find a SQL instance, we don't tread that as permanent. Fixes #111319 Release note (bug fix): When using a private preview of physical cluster replication, in some circumstances the source cluster would be unable to take backups when a source cluster node was unavailable. --- pkg/ccl/backupccl/BUILD.bazel | 1 + pkg/ccl/backupccl/backup_tenant_test.go | 71 +++++++++++++++++++++++++ pkg/jobs/joberror/BUILD.bazel | 1 + pkg/jobs/joberror/errors.go | 5 +- pkg/sql/distsql_physical_planner.go | 70 +++++++++++++++++------- 5 files changed, 127 insertions(+), 21 deletions(-) diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 92c0b5e96c49..a9d0e63a3714 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -241,6 +241,7 @@ go_test( "//pkg/kv/kvserver/protectedts/ptpb", "//pkg/kv/kvserver/protectedts/ptutil", "//pkg/multitenant/mtinfopb", + "//pkg/multitenant/tenantcapabilities", "//pkg/roachpb", "//pkg/scheduledjobs", "//pkg/scheduledjobs/schedulebase", diff --git a/pkg/ccl/backupccl/backup_tenant_test.go b/pkg/ccl/backupccl/backup_tenant_test.go index 3556bc7df150..ca4292a935f6 100644 --- a/pkg/ccl/backupccl/backup_tenant_test.go +++ b/pkg/ccl/backupccl/backup_tenant_test.go @@ -18,6 +18,7 @@ import ( _ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql" _ "github.com/cockroachdb/cockroach/pkg/sql/importer" @@ -29,10 +30,80 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) +func TestBackupSharedProcessTenantNodeDown(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + + skip.UnderRace(t, "multi-node, multi-tenant test too slow under race") + params := base.TestClusterArgs{ + ServerArgs: base.TestServerArgs{ + DisableDefaultTestTenant: true, + }, + } + params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals() + tc, hostDB, _, cleanup := backupRestoreTestSetupWithParams(t, multiNode, 0, /* numAccounts */ + InitManualReplication, params) + defer cleanup() + + hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.split_at.allow_for_secondary_tenant.enabled=true") + hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.scatter.allow_for_secondary_tenant.enabled=true") + hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING server.sqlliveness.ttl='2s'") + hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING server.sqlliveness.heartbeat='250ms'") + + testTenantID := roachpb.MustMakeTenantID(11) + _, tenantDB, err := tc.Server(0).StartSharedProcessTenant(ctx, + base.TestSharedProcessTenantArgs{ + TenantID: testTenantID, + TenantName: "test", + Knobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()}, + }) + require.NoError(t, err) + + hostDB.Exec(t, "ALTER TENANT test GRANT ALL CAPABILITIES") + tc.WaitForTenantCapabilities(t, testTenantID, map[tenantcapabilities.ID]string{ + tenantcapabilities.CanUseNodelocalStorage: "true", + tenantcapabilities.CanAdminSplit: "true", + tenantcapabilities.CanAdminScatter: "true", + }) + require.NoError(t, err) + + tenantSQL := sqlutils.MakeSQLRunner(tenantDB) + tenantSQL.Exec(t, "CREATE TABLE foo AS SELECT generate_series(1, 4000)") + tenantSQL.Exec(t, "ALTER TABLE foo SPLIT AT VALUES (500), (1000), (1500), (2000), (2500), (3000)") + tenantSQL.Exec(t, "ALTER TABLE foo SCATTER") + + t.Log("waiting for SQL instances") + waitStart := timeutil.Now() + for i := 1; i < multiNode; i++ { + sqlAddr := tc.Server(i).ServingSQLAddr() + testutils.SucceedsSoon(t, func() error { + t.Logf("waiting for server %d", i) + db, err := serverutils.OpenDBConnE(sqlAddr, "cluster:test/defaultdb", false, tc.Stopper()) + if err != nil { + return err + } + return db.Ping() + }) + } + t.Logf("all SQL instances (took %s)", timeutil.Since(waitStart)) + + // Shut down a node. + t.Log("shutting down server 2 (n3)") + tc.StopServer(2) + + tenantRunner := sqlutils.MakeSQLRunner(tenantDB) + var jobID jobspb.JobID + tenantRunner.QueryRow(t, "BACKUP INTO 'nodelocal://1/worker-failure' WITH detached").Scan(&jobID) + jobutils.WaitForJobToSucceed(t, tenantRunner, jobID) +} + func TestBackupTenantImportingTable(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/jobs/joberror/BUILD.bazel b/pkg/jobs/joberror/BUILD.bazel index 4a0cc63b885b..6a73ba8c1623 100644 --- a/pkg/jobs/joberror/BUILD.bazel +++ b/pkg/jobs/joberror/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//pkg/kv/kvclient/kvcoord", "//pkg/sql/flowinfra", + "//pkg/sql/sqlinstance", "//pkg/util/circuit", "//pkg/util/grpcutil", "//pkg/util/sysutil", diff --git a/pkg/jobs/joberror/errors.go b/pkg/jobs/joberror/errors.go index 96fb3858a3de..848ac9e2fbc2 100644 --- a/pkg/jobs/joberror/errors.go +++ b/pkg/jobs/joberror/errors.go @@ -16,6 +16,7 @@ import ( circuitbreaker "github.com/cockroachdb/circuitbreaker" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/sql/flowinfra" + "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" "github.com/cockroachdb/cockroach/pkg/util/circuit" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/sysutil" @@ -64,5 +65,7 @@ func IsPermanentBulkJobError(err error) bool { !kvcoord.IsSendError(err) && !isBreakerOpenError(err) && !sysutil.IsErrConnectionReset(err) && - !sysutil.IsErrConnectionRefused(err) + !sysutil.IsErrConnectionRefused(err) && + !errors.Is(err, sqlinstance.NonExistentInstanceError) + } diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index db329268275c..c09a08d448a1 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -1303,7 +1303,7 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem( ) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) { nodeMap := make(map[base.SQLInstanceID]int) resolver := func(nodeID roachpb.NodeID) base.SQLInstanceID { - return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID) + return dsp.healthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID) } for _, span := range spans { var err error @@ -1326,7 +1326,7 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem( func (dsp *DistSQLPlanner) partitionSpans( ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans, ) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) { - resolver, instances, err := dsp.makeInstanceResolver(ctx, planCtx.localityFilter) + resolver, instances, err := dsp.makeInstanceResolver(ctx, planCtx) if err != nil { return nil, false, err } @@ -1363,11 +1363,11 @@ func (dsp *DistSQLPlanner) partitionSpans( return partitions, ignoreMisplannedRanges, nil } -// deprecatedSQLInstanceIDForKVNodeIDSystem returns the SQL instance that should -// handle the range with the given node ID when planning is done on behalf of -// the system tenant. It ensures that the chosen SQL instance is healthy and of -// the compatible DistSQL version. -func (dsp *DistSQLPlanner) deprecatedSQLInstanceIDForKVNodeIDSystem( +// healthySQLInstanceIDForKVNodeIDSystem returns the SQL instance that +// should handle the range with the given node ID when planning is +// done on behalf of the system tenant. It ensures that the chosen SQL +// instance is healthy and of the compatible DistSQL version. +func (dsp *DistSQLPlanner) healthySQLInstanceIDForKVNodeIDSystem( ctx context.Context, planCtx *PlanningCtx, nodeID roachpb.NodeID, ) base.SQLInstanceID { sqlInstanceID := base.SQLInstanceID(nodeID) @@ -1382,15 +1382,39 @@ func (dsp *DistSQLPlanner) deprecatedSQLInstanceIDForKVNodeIDSystem( return sqlInstanceID } -// instanceIDForKVNodeHostedInstance returns the SQL instance ID for an -// instance that is hosted in the process of a KV node. Currently SQL +// healthySQLInstanceIDForKVNodeHostedInstanceResolver returns the SQL instance ID for +// an instance that is hosted in the process of a KV node. Currently SQL // instances run in KV node processes have IDs fixed to be equal to the KV -// nodes' IDs, and all of the SQL instances for a given tenant are _either_ -// run in this mixed mode or standalone, meaning if this server is in mixed -// mode, we can safely assume every other server is as well, and thus has -// IDs matching node IDs. -func instanceIDForKVNodeHostedInstance(nodeID roachpb.NodeID) base.SQLInstanceID { - return base.SQLInstanceID(nodeID) +// nodes' IDs, and all of the SQL instances for a given tenant are _either_ run +// in this mixed mode or standalone, meaning if this server is in mixed mode, we +// can safely assume every other server is as well, and thus has IDs matching +// node IDs. +// +// If the given node is not healthy, the gateway node is returned. +func (dsp *DistSQLPlanner) healthySQLInstanceIDForKVNodeHostedInstanceResolver( + ctx context.Context, planCtx *PlanningCtx, +) func(nodeID roachpb.NodeID) base.SQLInstanceID { + allHealthy, err := dsp.sqlAddressResolver.GetAllInstances(ctx) + if err != nil { + log.Warningf(ctx, "could not get all instances: %v", err) + return func(nodeID roachpb.NodeID) base.SQLInstanceID { + return dsp.gatewaySQLInstanceID + } + } + + healthyNodes := make(map[base.SQLInstanceID]struct{}, len(allHealthy)) + for _, n := range allHealthy { + healthyNodes[n.InstanceID] = struct{}{} + } + + return func(nodeID roachpb.NodeID) base.SQLInstanceID { + sqlInstance := base.SQLInstanceID(nodeID) + if _, ok := healthyNodes[sqlInstance]; ok { + return sqlInstance + } + log.Warningf(ctx, "not planning on node %d", sqlInstance) + return dsp.gatewaySQLInstanceID + } } // makeInstanceResolver returns a function that can choose the SQL instance ID @@ -1401,12 +1425,18 @@ func instanceIDForKVNodeHostedInstance(nodeID roachpb.NodeID) base.SQLInstanceID // If the instance was assigned statically or the instance list had no locality // information leading to random assignments then no instance list is returned. func (dsp *DistSQLPlanner) makeInstanceResolver( - ctx context.Context, locFilter roachpb.Locality, + ctx context.Context, planCtx *PlanningCtx, ) (func(roachpb.NodeID) base.SQLInstanceID, []sqlinstance.InstanceInfo, error) { _, mixedProcessMode := dsp.distSQLSrv.NodeID.OptionalNodeID() + locFilter := planCtx.localityFilter + + var mixedProcessSameNodeResolver func(nodeID roachpb.NodeID) base.SQLInstanceID + if mixedProcessMode { + mixedProcessSameNodeResolver = dsp.healthySQLInstanceIDForKVNodeHostedInstanceResolver(ctx, planCtx) + } if mixedProcessMode && locFilter.Empty() { - return instanceIDForKVNodeHostedInstance, nil, nil + return mixedProcessSameNodeResolver, nil, nil } // GetAllInstances only returns healthy instances. @@ -1464,7 +1494,7 @@ func (dsp *DistSQLPlanner) makeInstanceResolver( // locality filter in which case we can just use it. if mixedProcessMode { if ok, _ := nodeDesc.Locality.Matches(locFilter); ok { - return instanceIDForKVNodeHostedInstance(nodeID) + return mixedProcessSameNodeResolver(nodeID) } else { log.VEventf(ctx, 2, "node %d locality %s does not match locality filter %s, finding alternative placement...", @@ -1606,9 +1636,9 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan( } if dsp.useGossipPlanning(ctx, planCtx) && planCtx.localityFilter.Empty() { - return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil + return dsp.healthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil } - resolver, _, err := dsp.makeInstanceResolver(ctx, planCtx.localityFilter) + resolver, _, err := dsp.makeInstanceResolver(ctx, planCtx) if err != nil { return 0, err }