Skip to content

Commit

Permalink
sql: PartitionSpan should only use healthy nodes in mixed-process mode
Browse files Browse the repository at this point in the history
Previously, when running in mixed-process mode, the DistSQLPlanner's
PartitionSpans method would assume that it could directly assign a
given span to the SQLInstanceID that matches the NodeID of whatever
replica the current replica oracle returned, without regard to whether
the SQL instance was available.

This is different from the system tenant code paths which proactively
check node health and the non-mixed-process MT code paths which would
use an eventually consistent view of healthy nodes.

As a result, processes that use PartitionSpans such as BACKUP may fail
when a node was down.

Here, we have the mixed-process case work more like the separate
process case in which we only use nodes returned by the instance
reader. This list should eventually exclude any down nodes.

An alternative (or perhaps an addition) would be to allow MT planning
to do direct status checks more similar to how they are done for the
system tenant.

Finally, this also adds another error to our list of non-permanent
errors. Namely, if we fail to find a SQL instance, we don't tread that
as permanent.

Fixes #111319

Release note (bug fix): When using a private preview of physical
cluster replication, in some circumstances the source cluster would be
unable to take backups when a source cluster node was unavailable.
  • Loading branch information
stevendanna authored and adityamaru committed Oct 27, 2023
1 parent a078e1c commit 33d0eed
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 21 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ go_test(
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/kv/kvserver/protectedts/ptutil",
"//pkg/multitenant/mtinfopb",
"//pkg/multitenant/tenantcapabilities",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/scheduledjobs/schedulebase",
Expand Down
71 changes: 71 additions & 0 deletions pkg/ccl/backupccl/backup_tenant_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
_ "github.com/cockroachdb/cockroach/pkg/cloud/impl" // register cloud storage providers
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/multitenant/tenantcapabilities"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
_ "github.com/cockroachdb/cockroach/pkg/sql/importer"
Expand All @@ -29,10 +30,80 @@ import (
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

func TestBackupSharedProcessTenantNodeDown(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

skip.UnderRace(t, "multi-node, multi-tenant test too slow under race")
params := base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
DisableDefaultTestTenant: true,
},
}
params.ServerArgs.Knobs.JobsTestingKnobs = jobs.NewTestingKnobsWithShortIntervals()
tc, hostDB, _, cleanup := backupRestoreTestSetupWithParams(t, multiNode, 0, /* numAccounts */
InitManualReplication, params)
defer cleanup()

hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.split_at.allow_for_secondary_tenant.enabled=true")
hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING sql.scatter.allow_for_secondary_tenant.enabled=true")
hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING server.sqlliveness.ttl='2s'")
hostDB.Exec(t, "ALTER TENANT ALL SET CLUSTER SETTING server.sqlliveness.heartbeat='250ms'")

testTenantID := roachpb.MustMakeTenantID(11)
_, tenantDB, err := tc.Server(0).StartSharedProcessTenant(ctx,
base.TestSharedProcessTenantArgs{
TenantID: testTenantID,
TenantName: "test",
Knobs: base.TestingKnobs{JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals()},
})
require.NoError(t, err)

hostDB.Exec(t, "ALTER TENANT test GRANT ALL CAPABILITIES")
tc.WaitForTenantCapabilities(t, testTenantID, map[tenantcapabilities.ID]string{
tenantcapabilities.CanUseNodelocalStorage: "true",
tenantcapabilities.CanAdminSplit: "true",
tenantcapabilities.CanAdminScatter: "true",
})
require.NoError(t, err)

tenantSQL := sqlutils.MakeSQLRunner(tenantDB)
tenantSQL.Exec(t, "CREATE TABLE foo AS SELECT generate_series(1, 4000)")
tenantSQL.Exec(t, "ALTER TABLE foo SPLIT AT VALUES (500), (1000), (1500), (2000), (2500), (3000)")
tenantSQL.Exec(t, "ALTER TABLE foo SCATTER")

t.Log("waiting for SQL instances")
waitStart := timeutil.Now()
for i := 1; i < multiNode; i++ {
sqlAddr := tc.Server(i).ServingSQLAddr()
testutils.SucceedsSoon(t, func() error {
t.Logf("waiting for server %d", i)
db, err := serverutils.OpenDBConnE(sqlAddr, "cluster:test/defaultdb", false, tc.Stopper())
if err != nil {
return err
}
return db.Ping()
})
}
t.Logf("all SQL instances (took %s)", timeutil.Since(waitStart))

// Shut down a node.
t.Log("shutting down server 2 (n3)")
tc.StopServer(2)

tenantRunner := sqlutils.MakeSQLRunner(tenantDB)
var jobID jobspb.JobID
tenantRunner.QueryRow(t, "BACKUP INTO 'nodelocal://1/worker-failure' WITH detached").Scan(&jobID)
jobutils.WaitForJobToSucceed(t, tenantRunner, jobID)
}

func TestBackupTenantImportingTable(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
1 change: 1 addition & 0 deletions pkg/jobs/joberror/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ go_library(
deps = [
"//pkg/kv/kvclient/kvcoord",
"//pkg/sql/flowinfra",
"//pkg/sql/sqlinstance",
"//pkg/util/circuit",
"//pkg/util/grpcutil",
"//pkg/util/sysutil",
Expand Down
5 changes: 4 additions & 1 deletion pkg/jobs/joberror/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
circuitbreaker "github.com/cockroachdb/circuitbreaker"
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance"
"github.com/cockroachdb/cockroach/pkg/util/circuit"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/sysutil"
Expand Down Expand Up @@ -64,5 +65,7 @@ func IsPermanentBulkJobError(err error) bool {
!kvcoord.IsSendError(err) &&
!isBreakerOpenError(err) &&
!sysutil.IsErrConnectionReset(err) &&
!sysutil.IsErrConnectionRefused(err)
!sysutil.IsErrConnectionRefused(err) &&
!errors.Is(err, sqlinstance.NonExistentInstanceError)

}
70 changes: 50 additions & 20 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -1303,7 +1303,7 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem(
) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) {
nodeMap := make(map[base.SQLInstanceID]int)
resolver := func(nodeID roachpb.NodeID) base.SQLInstanceID {
return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID)
return dsp.healthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, nodeID)
}
for _, span := range spans {
var err error
Expand All @@ -1326,7 +1326,7 @@ func (dsp *DistSQLPlanner) deprecatedPartitionSpansSystem(
func (dsp *DistSQLPlanner) partitionSpans(
ctx context.Context, planCtx *PlanningCtx, spans roachpb.Spans,
) (partitions []SpanPartition, ignoreMisplannedRanges bool, _ error) {
resolver, instances, err := dsp.makeInstanceResolver(ctx, planCtx.localityFilter)
resolver, instances, err := dsp.makeInstanceResolver(ctx, planCtx)
if err != nil {
return nil, false, err
}
Expand Down Expand Up @@ -1363,11 +1363,11 @@ func (dsp *DistSQLPlanner) partitionSpans(
return partitions, ignoreMisplannedRanges, nil
}

// deprecatedSQLInstanceIDForKVNodeIDSystem returns the SQL instance that should
// handle the range with the given node ID when planning is done on behalf of
// the system tenant. It ensures that the chosen SQL instance is healthy and of
// the compatible DistSQL version.
func (dsp *DistSQLPlanner) deprecatedSQLInstanceIDForKVNodeIDSystem(
// healthySQLInstanceIDForKVNodeIDSystem returns the SQL instance that
// should handle the range with the given node ID when planning is
// done on behalf of the system tenant. It ensures that the chosen SQL
// instance is healthy and of the compatible DistSQL version.
func (dsp *DistSQLPlanner) healthySQLInstanceIDForKVNodeIDSystem(
ctx context.Context, planCtx *PlanningCtx, nodeID roachpb.NodeID,
) base.SQLInstanceID {
sqlInstanceID := base.SQLInstanceID(nodeID)
Expand All @@ -1382,15 +1382,39 @@ func (dsp *DistSQLPlanner) deprecatedSQLInstanceIDForKVNodeIDSystem(
return sqlInstanceID
}

// instanceIDForKVNodeHostedInstance returns the SQL instance ID for an
// instance that is hosted in the process of a KV node. Currently SQL
// healthySQLInstanceIDForKVNodeHostedInstanceResolver returns the SQL instance ID for
// an instance that is hosted in the process of a KV node. Currently SQL
// instances run in KV node processes have IDs fixed to be equal to the KV
// nodes' IDs, and all of the SQL instances for a given tenant are _either_
// run in this mixed mode or standalone, meaning if this server is in mixed
// mode, we can safely assume every other server is as well, and thus has
// IDs matching node IDs.
func instanceIDForKVNodeHostedInstance(nodeID roachpb.NodeID) base.SQLInstanceID {
return base.SQLInstanceID(nodeID)
// nodes' IDs, and all of the SQL instances for a given tenant are _either_ run
// in this mixed mode or standalone, meaning if this server is in mixed mode, we
// can safely assume every other server is as well, and thus has IDs matching
// node IDs.
//
// If the given node is not healthy, the gateway node is returned.
func (dsp *DistSQLPlanner) healthySQLInstanceIDForKVNodeHostedInstanceResolver(
ctx context.Context, planCtx *PlanningCtx,
) func(nodeID roachpb.NodeID) base.SQLInstanceID {
allHealthy, err := dsp.sqlAddressResolver.GetAllInstances(ctx)
if err != nil {
log.Warningf(ctx, "could not get all instances: %v", err)
return func(nodeID roachpb.NodeID) base.SQLInstanceID {
return dsp.gatewaySQLInstanceID
}
}

healthyNodes := make(map[base.SQLInstanceID]struct{}, len(allHealthy))
for _, n := range allHealthy {
healthyNodes[n.InstanceID] = struct{}{}
}

return func(nodeID roachpb.NodeID) base.SQLInstanceID {
sqlInstance := base.SQLInstanceID(nodeID)
if _, ok := healthyNodes[sqlInstance]; ok {
return sqlInstance
}
log.Warningf(ctx, "not planning on node %d", sqlInstance)
return dsp.gatewaySQLInstanceID
}
}

// makeInstanceResolver returns a function that can choose the SQL instance ID
Expand All @@ -1401,12 +1425,18 @@ func instanceIDForKVNodeHostedInstance(nodeID roachpb.NodeID) base.SQLInstanceID
// If the instance was assigned statically or the instance list had no locality
// information leading to random assignments then no instance list is returned.
func (dsp *DistSQLPlanner) makeInstanceResolver(
ctx context.Context, locFilter roachpb.Locality,
ctx context.Context, planCtx *PlanningCtx,
) (func(roachpb.NodeID) base.SQLInstanceID, []sqlinstance.InstanceInfo, error) {
_, mixedProcessMode := dsp.distSQLSrv.NodeID.OptionalNodeID()
locFilter := planCtx.localityFilter

var mixedProcessSameNodeResolver func(nodeID roachpb.NodeID) base.SQLInstanceID
if mixedProcessMode {
mixedProcessSameNodeResolver = dsp.healthySQLInstanceIDForKVNodeHostedInstanceResolver(ctx, planCtx)
}

if mixedProcessMode && locFilter.Empty() {
return instanceIDForKVNodeHostedInstance, nil, nil
return mixedProcessSameNodeResolver, nil, nil
}

// GetAllInstances only returns healthy instances.
Expand Down Expand Up @@ -1464,7 +1494,7 @@ func (dsp *DistSQLPlanner) makeInstanceResolver(
// locality filter in which case we can just use it.
if mixedProcessMode {
if ok, _ := nodeDesc.Locality.Matches(locFilter); ok {
return instanceIDForKVNodeHostedInstance(nodeID)
return mixedProcessSameNodeResolver(nodeID)
} else {
log.VEventf(ctx, 2,
"node %d locality %s does not match locality filter %s, finding alternative placement...",
Expand Down Expand Up @@ -1606,9 +1636,9 @@ func (dsp *DistSQLPlanner) getInstanceIDForScan(
}

if dsp.useGossipPlanning(ctx, planCtx) && planCtx.localityFilter.Empty() {
return dsp.deprecatedSQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil
return dsp.healthySQLInstanceIDForKVNodeIDSystem(ctx, planCtx, replDesc.NodeID), nil
}
resolver, _, err := dsp.makeInstanceResolver(ctx, planCtx.localityFilter)
resolver, _, err := dsp.makeInstanceResolver(ctx, planCtx)
if err != nil {
return 0, err
}
Expand Down

0 comments on commit 33d0eed

Please sign in to comment.