Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: sql: PartitionSpan should only use healthy nodes in mixed-process mode #113171

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ go_test(
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/kv/kvserver/protectedts/ptutil",
"//pkg/multitenant/mtinfopb",
"//pkg/multitenant/tenantcapabilities",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/scheduledjobs/schedulebase",
Expand Down
71 changes: 71 additions & 0 deletions pkg/ccl/backupccl/backup_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
_ "github.com/cockroachdb/cockroach/pkg/sql/importer"
Expand All @@ -29,10 +30,80 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestBackupSharedProcessTenantNodeDown(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

skip.UnderRace(t, "multi-node, multi-tenant test too slow under race")
params := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DisableDefaultTestTenant: true,
},
}
params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
tc, hostDB, _, cleanup := backupRestoreTestSetupWithParams(t, multiNode, 0, /* numAccounts */
InitManualReplication, params)
defer cleanup()

hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.split_at.allow_for_secondary_tenant.enabled=true")
hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.scatter.allow_for_secondary_tenant.enabled=true")
hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING server.sqlliveness.ttl='2s'")
hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING server.sqlliveness.heartbeat='250ms'")

testTenantID := roachpb.MustMakeTenantID(11)
_, tenantDB, err := tc.Server(0).StartSharedProcessTenant(ctx,
base.TestSharedProcessTenantArgs{
TenantID: testTenantID,
TenantName: "test",
Knobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()},
})
require.NoError(t, err)

hostDB.Exec(t, "ALTER TENANT test GRANT ALL CAPABILITIES")
tc.WaitForTenantCapabilities(t, testTenantID, map[tenantcapabilities.ID]string{
tenantcapabilities.CanUseNodelocalStorage: "true",
tenantcapabilities.CanAdminSplit: "true",
tenantcapabilities.CanAdminScatter: "true",
})
require.NoError(t, err)

tenantSQL := sqlutils.MakeSQLRunner(tenantDB)
tenantSQL.Exec(t, "CREATE TABLE foo AS SELECT generate_series(1, 4000)")
tenantSQL.Exec(t, "ALTER TABLE foo SPLIT AT VALUES (500), (1000), (1500), (2000), (2500), (3000)")
tenantSQL.Exec(t, "ALTER TABLE foo SCATTER")

t.Log("waiting for SQL instances")
waitStart := timeutil.Now()
for i := 1; i < multiNode; i++ {
sqlAddr := tc.Server(i).ServingSQLAddr()
testutils.SucceedsSoon(t, func() error {
t.Logf("waiting for server %d", i)
db, err := serverutils.OpenDBConnE(sqlAddr, "cluster:test/defaultdb", false, tc.Stopper())
if err != nil {
return err
}
return db.Ping()
})
}
t.Logf("all SQL instances (took %s)", timeutil.Since(waitStart))

// Shut down a node.
t.Log("shutting down server 2 (n3)")
tc.StopServer(2)

tenantRunner := sqlutils.MakeSQLRunner(tenantDB)
var jobID jobspb.JobID
tenantRunner.QueryRow(t, "BACKUP INTO 'nodelocal://1/worker-failure' WITH detached").Scan(&jobID)
jobutils.WaitForJobToSucceed(t, tenantRunner, jobID)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@stevendanna I changed the end of this test a little bit because I saw a flake of the form. I don't think it changes the essence of the test, but if that computed timeout was an assertion in itself let me know and I'll try to add it back in.

Screenshot 2023-10-26 at 9 30 18 PM

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a good change.

}

func TestBackupTenantImportingTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/joberror/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//pkg/kv/kvclient/kvcoord",
"//pkg/sql/flowinfra",
"//pkg/sql/sqlinstance",
"//pkg/util/circuit",
"//pkg/util/grpcutil",
"//pkg/util/sysutil",
Expand Down
5 changes: 4 additions & 1 deletion pkg/jobs/joberror/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
circuitbreaker "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
Expand Down Expand Up @@ -64,5 +65,7 @@ func IsPermanentBulkJobError(err error) bool {
!kvcoord.IsSendError(err) &&
!isBreakerOpenError(err) &&
!sysutil.IsErrConnectionReset(err) &&
!sysutil.IsErrConnectionRefused(err)
!sysutil.IsErrConnectionRefused(err) &&
!errors.Is(err, sqlinstance.NonExistentInstanceError)

}
70 changes: 50 additions & 20 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem(
) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) {
nodeMap := make(map[base.SQLInstanceID]int)
resolver := func(nodeID roachpb.NodeID) base.SQLInstanceID {
return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID)
return dsp.healthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID)
}
for _, span := range spans {
var err error
Expand All @@ -1326,7 +1326,7 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem(
func (dsp *DistSQLPlanner) partitionSpans(
ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans,
) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) {
resolver, instances, err := dsp.makeInstanceResolver(ctx, planCtx.localityFilter)
resolver, instances, err := dsp.makeInstanceResolver(ctx, planCtx)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -1363,11 +1363,11 @@ func (dsp *DistSQLPlanner) partitionSpans(
return partitions, ignoreMisplannedRanges, nil
}

// deprecatedSQLInstanceIDForKVNodeIDSystem returns the SQL instance that should
// handle the range with the given node ID when planning is done on behalf of
// the system tenant. It ensures that the chosen SQL instance is healthy and of
// the compatible DistSQL version.
func (dsp *DistSQLPlanner) deprecatedSQLInstanceIDForKVNodeIDSystem(
// healthySQLInstanceIDForKVNodeIDSystem returns the SQL instance that
// should handle the range with the given node ID when planning is
// done on behalf of the system tenant. It ensures that the chosen SQL
// instance is healthy and of the compatible DistSQL version.
func (dsp *DistSQLPlanner) healthySQLInstanceIDForKVNodeIDSystem(
ctx context.Context, planCtx *PlanningCtx, nodeID roachpb.NodeID,
) base.SQLInstanceID {
sqlInstanceID := base.SQLInstanceID(nodeID)
Expand All @@ -1382,15 +1382,39 @@ func (dsp *DistSQLPlanner) deprecatedSQLInstanceIDForKVNodeIDSystem(
return sqlInstanceID
}

// instanceIDForKVNodeHostedInstance returns the SQL instance ID for an
// instance that is hosted in the process of a KV node. Currently SQL
// healthySQLInstanceIDForKVNodeHostedInstanceResolver returns the SQL instance ID for
// an instance that is hosted in the process of a KV node. Currently SQL
// instances run in KV node processes have IDs fixed to be equal to the KV
// nodes' IDs, and all of the SQL instances for a given tenant are _either_
// run in this mixed mode or standalone, meaning if this server is in mixed
// mode, we can safely assume every other server is as well, and thus has
// IDs matching node IDs.
func instanceIDForKVNodeHostedInstance(nodeID roachpb.NodeID) base.SQLInstanceID {
return base.SQLInstanceID(nodeID)
// nodes' IDs, and all of the SQL instances for a given tenant are _either_ run
// in this mixed mode or standalone, meaning if this server is in mixed mode, we
// can safely assume every other server is as well, and thus has IDs matching
// node IDs.
//
// If the given node is not healthy, the gateway node is returned.
func (dsp *DistSQLPlanner) healthySQLInstanceIDForKVNodeHostedInstanceResolver(
ctx context.Context, planCtx *PlanningCtx,
) func(nodeID roachpb.NodeID) base.SQLInstanceID {
allHealthy, err := dsp.sqlAddressResolver.GetAllInstances(ctx)
if err != nil {
log.Warningf(ctx, "could not get all instances: %v", err)
return func(nodeID roachpb.NodeID) base.SQLInstanceID {
return dsp.gatewaySQLInstanceID
}
}

healthyNodes := make(map[base.SQLInstanceID]struct{}, len(allHealthy))
for _, n := range allHealthy {
healthyNodes[n.InstanceID] = struct{}{}
}

return func(nodeID roachpb.NodeID) base.SQLInstanceID {
sqlInstance := base.SQLInstanceID(nodeID)
if _, ok := healthyNodes[sqlInstance]; ok {
return sqlInstance
}
log.Warningf(ctx, "not planning on node %d", sqlInstance)
return dsp.gatewaySQLInstanceID
}
}

// makeInstanceResolver returns a function that can choose the SQL instance ID
Expand All @@ -1401,12 +1425,18 @@ func instanceIDForKVNodeHostedInstance(nodeID roachpb.NodeID) base.SQLInstanceID
// If the instance was assigned statically or the instance list had no locality
// information leading to random assignments then no instance list is returned.
func (dsp *DistSQLPlanner) makeInstanceResolver(
ctx context.Context, locFilter roachpb.Locality,
ctx context.Context, planCtx *PlanningCtx,
) (func(roachpb.NodeID) base.SQLInstanceID, []sqlinstance.InstanceInfo, error) {
_, mixedProcessMode := dsp.distSQLSrv.NodeID.OptionalNodeID()
locFilter := planCtx.localityFilter

var mixedProcessSameNodeResolver func(nodeID roachpb.NodeID) base.SQLInstanceID
if mixedProcessMode {
mixedProcessSameNodeResolver = dsp.healthySQLInstanceIDForKVNodeHostedInstanceResolver(ctx, planCtx)
}

if mixedProcessMode && locFilter.Empty() {
return instanceIDForKVNodeHostedInstance, nil, nil
return mixedProcessSameNodeResolver, nil, nil
}

// GetAllInstances only returns healthy instances.
Expand Down Expand Up @@ -1464,7 +1494,7 @@ func (dsp *DistSQLPlanner) makeInstanceResolver(
// locality filter in which case we can just use it.
if mixedProcessMode {
if ok, _ := nodeDesc.Locality.Matches(locFilter); ok {
return instanceIDForKVNodeHostedInstance(nodeID)
return mixedProcessSameNodeResolver(nodeID)
} else {
log.VEventf(ctx, 2,
"node %d locality %s does not match locality filter %s, finding alternative placement...",
Expand Down Expand Up @@ -1606,9 +1636,9 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan(
}

if dsp.useGossipPlanning(ctx, planCtx) && planCtx.localityFilter.Empty() {
return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil
return dsp.healthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil
}
resolver, _, err := dsp.makeInstanceResolver(ctx, planCtx.localityFilter)
resolver, _, err := dsp.makeInstanceResolver(ctx, planCtx)
if err != nil {
return 0, err
}
Expand Down