Skip to content

Commit

Permalink
Merge pull request #113171 from adityamaru/backport23.1-105256-111337
Browse files Browse the repository at this point in the history
release-23.1: sql: PartitionSpan should only use healthy nodes in mixed-process mode
  • Loading branch information
adityamaru authored Oct 30, 2023
2 parents 7952853 + 33d0eed commit 2da3818
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 21 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ go_test(
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/kv/kvserver/protectedts/ptutil",
"//pkg/multitenant/mtinfopb",
"//pkg/multitenant/tenantcapabilities",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/scheduledjobs/schedulebase",
Expand Down
71 changes: 71 additions & 0 deletions pkg/ccl/backupccl/backup_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
_ "github.com/cockroachdb/cockroach/pkg/sql/importer"
Expand All @@ -29,10 +30,80 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestBackupSharedProcessTenantNodeDown(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

skip.UnderRace(t, "multi-node, multi-tenant test too slow under race")
params := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DisableDefaultTestTenant: true,
},
}
params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
tc, hostDB, _, cleanup := backupRestoreTestSetupWithParams(t, multiNode, 0, /* numAccounts */
InitManualReplication, params)
defer cleanup()

hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.split_at.allow_for_secondary_tenant.enabled=true")
hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.scatter.allow_for_secondary_tenant.enabled=true")
hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING server.sqlliveness.ttl='2s'")
hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING server.sqlliveness.heartbeat='250ms'")

testTenantID := roachpb.MustMakeTenantID(11)
_, tenantDB, err := tc.Server(0).StartSharedProcessTenant(ctx,
base.TestSharedProcessTenantArgs{
TenantID: testTenantID,
TenantName: "test",
Knobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()},
})
require.NoError(t, err)

hostDB.Exec(t, "ALTER TENANT test GRANT ALL CAPABILITIES")
tc.WaitForTenantCapabilities(t, testTenantID, map[tenantcapabilities.ID]string{
tenantcapabilities.CanUseNodelocalStorage: "true",
tenantcapabilities.CanAdminSplit: "true",
tenantcapabilities.CanAdminScatter: "true",
})
require.NoError(t, err)

tenantSQL := sqlutils.MakeSQLRunner(tenantDB)
tenantSQL.Exec(t, "CREATE TABLE foo AS SELECT generate_series(1, 4000)")
tenantSQL.Exec(t, "ALTER TABLE foo SPLIT AT VALUES (500), (1000), (1500), (2000), (2500), (3000)")
tenantSQL.Exec(t, "ALTER TABLE foo SCATTER")

t.Log("waiting for SQL instances")
waitStart := timeutil.Now()
for i := 1; i < multiNode; i++ {
sqlAddr := tc.Server(i).ServingSQLAddr()
testutils.SucceedsSoon(t, func() error {
t.Logf("waiting for server %d", i)
db, err := serverutils.OpenDBConnE(sqlAddr, "cluster:test/defaultdb", false, tc.Stopper())
if err != nil {
return err
}
return db.Ping()
})
}
t.Logf("all SQL instances (took %s)", timeutil.Since(waitStart))

// Shut down a node.
t.Log("shutting down server 2 (n3)")
tc.StopServer(2)

tenantRunner := sqlutils.MakeSQLRunner(tenantDB)
var jobID jobspb.JobID
tenantRunner.QueryRow(t, "BACKUP INTO 'nodelocal://1/worker-failure' WITH detached").Scan(&jobID)
jobutils.WaitForJobToSucceed(t, tenantRunner, jobID)
}

func TestBackupTenantImportingTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/joberror/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//pkg/kv/kvclient/kvcoord",
"//pkg/sql/flowinfra",
"//pkg/sql/sqlinstance",
"//pkg/util/circuit",
"//pkg/util/grpcutil",
"//pkg/util/sysutil",
Expand Down
5 changes: 4 additions & 1 deletion pkg/jobs/joberror/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
circuitbreaker "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
Expand Down Expand Up @@ -64,5 +65,7 @@ func IsPermanentBulkJobError(err error) bool {
!kvcoord.IsSendError(err) &&
!isBreakerOpenError(err) &&
!sysutil.IsErrConnectionReset(err) &&
!sysutil.IsErrConnectionRefused(err)
!sysutil.IsErrConnectionRefused(err) &&
!errors.Is(err, sqlinstance.NonExistentInstanceError)

}
70 changes: 50 additions & 20 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem(
) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) {
nodeMap := make(map[base.SQLInstanceID]int)
resolver := func(nodeID roachpb.NodeID) base.SQLInstanceID {
return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID)
return dsp.healthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID)
}
for _, span := range spans {
var err error
Expand All @@ -1326,7 +1326,7 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem(
func (dsp *DistSQLPlanner) partitionSpans(
ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans,
) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) {
resolver, instances, err := dsp.makeInstanceResolver(ctx, planCtx.localityFilter)
resolver, instances, err := dsp.makeInstanceResolver(ctx, planCtx)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -1363,11 +1363,11 @@ func (dsp *DistSQLPlanner) partitionSpans(
return partitions, ignoreMisplannedRanges, nil
}

// deprecatedSQLInstanceIDForKVNodeIDSystem returns the SQL instance that should
// handle the range with the given node ID when planning is done on behalf of
// the system tenant. It ensures that the chosen SQL instance is healthy and of
// the compatible DistSQL version.
func (dsp *DistSQLPlanner) deprecatedSQLInstanceIDForKVNodeIDSystem(
// healthySQLInstanceIDForKVNodeIDSystem returns the SQL instance that
// should handle the range with the given node ID when planning is
// done on behalf of the system tenant. It ensures that the chosen SQL
// instance is healthy and of the compatible DistSQL version.
func (dsp *DistSQLPlanner) healthySQLInstanceIDForKVNodeIDSystem(
ctx context.Context, planCtx *PlanningCtx, nodeID roachpb.NodeID,
) base.SQLInstanceID {
sqlInstanceID := base.SQLInstanceID(nodeID)
Expand All @@ -1382,15 +1382,39 @@ func (dsp *DistSQLPlanner) deprecatedSQLInstanceIDForKVNodeIDSystem(
return sqlInstanceID
}

// instanceIDForKVNodeHostedInstance returns the SQL instance ID for an
// instance that is hosted in the process of a KV node. Currently SQL
// healthySQLInstanceIDForKVNodeHostedInstanceResolver returns the SQL instance ID for
// an instance that is hosted in the process of a KV node. Currently SQL
// instances run in KV node processes have IDs fixed to be equal to the KV
// nodes' IDs, and all of the SQL instances for a given tenant are _either_
// run in this mixed mode or standalone, meaning if this server is in mixed
// mode, we can safely assume every other server is as well, and thus has
// IDs matching node IDs.
func instanceIDForKVNodeHostedInstance(nodeID roachpb.NodeID) base.SQLInstanceID {
return base.SQLInstanceID(nodeID)
// nodes' IDs, and all of the SQL instances for a given tenant are _either_ run
// in this mixed mode or standalone, meaning if this server is in mixed mode, we
// can safely assume every other server is as well, and thus has IDs matching
// node IDs.
//
// If the given node is not healthy, the gateway node is returned.
func (dsp *DistSQLPlanner) healthySQLInstanceIDForKVNodeHostedInstanceResolver(
ctx context.Context, planCtx *PlanningCtx,
) func(nodeID roachpb.NodeID) base.SQLInstanceID {
allHealthy, err := dsp.sqlAddressResolver.GetAllInstances(ctx)
if err != nil {
log.Warningf(ctx, "could not get all instances: %v", err)
return func(nodeID roachpb.NodeID) base.SQLInstanceID {
return dsp.gatewaySQLInstanceID
}
}

healthyNodes := make(map[base.SQLInstanceID]struct{}, len(allHealthy))
for _, n := range allHealthy {
healthyNodes[n.InstanceID] = struct{}{}
}

return func(nodeID roachpb.NodeID) base.SQLInstanceID {
sqlInstance := base.SQLInstanceID(nodeID)
if _, ok := healthyNodes[sqlInstance]; ok {
return sqlInstance
}
log.Warningf(ctx, "not planning on node %d", sqlInstance)
return dsp.gatewaySQLInstanceID
}
}

// makeInstanceResolver returns a function that can choose the SQL instance ID
Expand All @@ -1401,12 +1425,18 @@ func instanceIDForKVNodeHostedInstance(nodeID roachpb.NodeID) base.SQLInstanceID
// If the instance was assigned statically or the instance list had no locality
// information leading to random assignments then no instance list is returned.
func (dsp *DistSQLPlanner) makeInstanceResolver(
ctx context.Context, locFilter roachpb.Locality,
ctx context.Context, planCtx *PlanningCtx,
) (func(roachpb.NodeID) base.SQLInstanceID, []sqlinstance.InstanceInfo, error) {
_, mixedProcessMode := dsp.distSQLSrv.NodeID.OptionalNodeID()
locFilter := planCtx.localityFilter

var mixedProcessSameNodeResolver func(nodeID roachpb.NodeID) base.SQLInstanceID
if mixedProcessMode {
mixedProcessSameNodeResolver = dsp.healthySQLInstanceIDForKVNodeHostedInstanceResolver(ctx, planCtx)
}

if mixedProcessMode && locFilter.Empty() {
return instanceIDForKVNodeHostedInstance, nil, nil
return mixedProcessSameNodeResolver, nil, nil
}

// GetAllInstances only returns healthy instances.
Expand Down Expand Up @@ -1464,7 +1494,7 @@ func (dsp *DistSQLPlanner) makeInstanceResolver(
// locality filter in which case we can just use it.
if mixedProcessMode {
if ok, _ := nodeDesc.Locality.Matches(locFilter); ok {
return instanceIDForKVNodeHostedInstance(nodeID)
return mixedProcessSameNodeResolver(nodeID)
} else {
log.VEventf(ctx, 2,
"node %d locality %s does not match locality filter %s, finding alternative placement...",
Expand Down Expand Up @@ -1606,9 +1636,9 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan(
}

if dsp.useGossipPlanning(ctx, planCtx) && planCtx.localityFilter.Empty() {
return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil
return dsp.healthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil
}
resolver, _, err := dsp.makeInstanceResolver(ctx, planCtx.localityFilter)
resolver, _, err := dsp.makeInstanceResolver(ctx, planCtx)
if err != nil {
return 0, err
}
Expand Down

0 comments on commit 2da3818

Please sign in to comment.