Skip to content

Commit

Permalink
Merge #99081 #99282
Browse files Browse the repository at this point in the history
99081: sql: set index recommendations after planning but before execution r=yuzefovich a=yuzefovich

This commit moves the call to set the index recommendations to be done
right after planning was completed. Previously, it was done after the
execution, but it makes more sense to do it after planning. This also
allows us to remove the check on the txn still being open.

This required clarifying how `instrumentationHelper.indexRecs` is used.
Previously, it was used for two purposes:
- for recording recommendations to be included in the
`statement_statistics` system table
- for showing when executing EXPLAIN statement.

These two usages have somewhat different requirements, so this commit
splits them out into two different slices. This also allows us to reuse
the recommendations from the latter should we choose to generate the
recommendations for the former (previously, this would result in
redundant regeneration of the recommendations).

Epic: None

Release note: None

99282:  backupccl: support 'execution locality' option in scheduled backups r=dt a=dt

Release note: none.
Epic: CRDB-9547.

Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: David Taylor <[email protected]>
  • Loading branch information
3 people committed Mar 23, 2023
3 parents b7f527a + 0cbb425 + 85eb8d9 commit 436613f
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 39 deletions.
11 changes: 11 additions & 0 deletions pkg/ccl/backupccl/alter_backup_schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,10 @@ func processOptionsForArgs(inOpts tree.BackupOptions, outOpts *tree.BackupOption
outOpts.CaptureRevisionHistory = inOpts.CaptureRevisionHistory
}

if inOpts.ExecutionLocality != nil {
outOpts.ExecutionLocality = inOpts.ExecutionLocality
}

if inOpts.IncludeAllSecondaryTenants != nil {
outOpts.IncludeAllSecondaryTenants = inOpts.IncludeAllSecondaryTenants
}
Expand All @@ -352,6 +356,13 @@ func processOptionsForArgs(inOpts tree.BackupOptions, outOpts *tree.BackupOption
outOpts.EncryptionPassphrase = inOpts.EncryptionPassphrase
}
}
if inOpts.ExecutionLocality != nil {
if tree.AsStringWithFlags(inOpts.ExecutionLocality, tree.FmtBareStrings) == "" {
outOpts.ExecutionLocality = nil
} else {
outOpts.ExecutionLocality = inOpts.ExecutionLocality
}
}
if inOpts.EncryptionKMSURI != nil {
if tree.AsStringWithFlags(&inOpts.EncryptionKMSURI, tree.FmtBareStrings) == "" {
outOpts.EncryptionKMSURI = nil
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/backup_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ func resolveOptionsForBackupJobDescription(
newOpts := tree.BackupOptions{
CaptureRevisionHistory: opts.CaptureRevisionHistory,
Detached: opts.Detached,
ExecutionLocality: opts.ExecutionLocality,
}

if opts.EncryptionPassphrase != nil {
Expand Down
17 changes: 17 additions & 0 deletions pkg/ccl/backupccl/create_scheduled_backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type scheduledBackupSpec struct {
kmsURIs []string
incrementalStorage []string
includeAllSecondaryTenants *bool
execLoc *string
}

func makeScheduleDetails(opts map[string]string) (jobspb.ScheduleDetails, error) {
Expand Down Expand Up @@ -234,6 +235,10 @@ func doCreateBackupSchedules(
}
}

if eval.execLoc != nil && *eval.execLoc != "" {
backupNode.Options.ExecutionLocality = tree.NewStrVal(*eval.execLoc)
}

// Evaluate encryption KMS URIs if set.
// Only one of encryption passphrase and KMS URI should be set, but this check
// is done during backup planning so we do not need to worry about it here.
Expand Down Expand Up @@ -688,6 +693,17 @@ func makeScheduledBackupSpec(
}
spec.captureRevisionHistory = &capture
}

if schedule.BackupOptions.ExecutionLocality != nil {
loc, err := exprEval.String(
ctx, schedule.BackupOptions.ExecutionLocality,
)
if err != nil {
return nil, err
}
spec.execLoc = &loc
}

if schedule.BackupOptions.IncludeAllSecondaryTenants != nil {
includeSecondary, err := exprEval.Bool(ctx,
schedule.BackupOptions.IncludeAllSecondaryTenants)
Expand Down Expand Up @@ -761,6 +777,7 @@ func createBackupScheduleTypeCheck(
schedule.ScheduleLabelSpec.Label,
schedule.Recurrence,
schedule.BackupOptions.EncryptionPassphrase,
schedule.BackupOptions.ExecutionLocality,
}
if schedule.FullBackup != nil {
stringExprs = append(stringExprs, schedule.FullBackup.Recurrence)
Expand Down
23 changes: 23 additions & 0 deletions pkg/ccl/backupccl/create_scheduled_backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobstest"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs"
"github.com/cockroachdb/cockroach/pkg/scheduledjobs/schedulebase"
"github.com/cockroachdb/cockroach/pkg/security/username"
Expand Down Expand Up @@ -97,6 +98,7 @@ func newTestHelper(t *testing.T) (*testHelper, func()) {
}

args := base.TestServerArgs{
Locality: roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "of-france"}}},
Settings: cluster.MakeClusterSettings(),
ExternalIODir: dir,
// Some scheduled backup tests fail when run within a tenant. More
Expand Down Expand Up @@ -639,6 +641,27 @@ func TestSerializesScheduledBackupExecutionArgs(t *testing.T) {
},
},
},
{
name: "exec-loc",
user: enterpriseUser,
query: `
CREATE SCHEDULE FOR BACKUP DATABASE system
INTO 'nodelocal://0/backup'
WITH revision_history, execution locality = 'region=of-france'
RECURRING '1 2 * * *'
FULL BACKUP ALWAYS
WITH SCHEDULE OPTIONS first_run=$1
`,
queryArgs: []interface{}{th.env.Now().Add(time.Minute)},
expectedSchedules: []expectedSchedule{
{
nameRe: "BACKUP .+",
backupStmt: "BACKUP DATABASE system INTO 'nodelocal://0/backup' " +
"WITH revision_history = true, detached, execution locality = 'region=of-france'",
period: 24 * time.Hour,
},
},
},
{
name: "missing-destination-placeholder",
query: `CREATE SCHEDULE FOR BACKUP TABLE system.public.jobs INTO $1 RECURRING '@hourly'`,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
new-cluster name=s1 allow-implicit-access
new-cluster name=s1 allow-implicit-access localities=us-east-1
----

# Create test schedules.
Expand Down Expand Up @@ -35,16 +35,20 @@ exec-sql
alter backup schedule $incID set with revision_history = false;
----

exec-sql
alter backup schedule $incID set with execution locality = 'region=us-east-1'
----

query-sql
with schedules as (show schedules) select id, command->'backup_statement' from schedules where label='datatest' order by command->>'backup_type' asc;
----
$fullID "BACKUP INTO 'nodelocal://1/example-schedule' WITH revision_history = false, detached"
$incID "BACKUP INTO LATEST IN 'nodelocal://1/example-schedule' WITH revision_history = false, detached"
$fullID "BACKUP INTO 'nodelocal://1/example-schedule' WITH revision_history = false, detached, execution locality = 'region=us-east-1'"
$incID "BACKUP INTO LATEST IN 'nodelocal://1/example-schedule' WITH revision_history = false, detached, execution locality = 'region=us-east-1'"

# Change an option and set another.

exec-sql
alter backup schedule $incID set with revision_history = true, set with encryption_passphrase = 'abc';
alter backup schedule $incID set with revision_history = true, set with execution locality = '', set with encryption_passphrase = 'abc';
----

query-sql
Expand Down
17 changes: 6 additions & 11 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -1372,17 +1372,6 @@ func (ex *connExecutor) dispatchToExecutionEngine(

populateQueryLevelStatsAndRegions(ctx, planner, ex.server.cfg, &stats, &ex.cpuStatsCollector)

// The transaction (from planner.txn) may already have been committed at this point,
// due to one-phase commit optimization or an error. Since we use that transaction
// on the optimizer, check if is still open before generating index recommendations.
if planner.txn.IsOpen() {
// Set index recommendations, so it can be saved on statement statistics.
// TODO(yuzefovich): figure out whether we want to set isInternalPlanner
// to true for the internal executors.
isInternal := ex.executorType == executorTypeInternal || planner.isInternalPlanner
planner.instrumentation.SetIndexRecommendations(ctx, ex.server.idxRecommendationsCache, planner, isInternal)
}

// Record the statement summary. This also closes the plan if the
// plan has not been closed earlier.
stmtFingerprintID = ex.recordStatementSummary(
Expand Down Expand Up @@ -1681,6 +1670,12 @@ func (ex *connExecutor) makeExecPlan(ctx context.Context, planner *planner) erro
ex.extraTxnState.numDDL++
}

// Set index recommendations, so it can be saved on statement statistics.
// TODO(yuzefovich): figure out whether we want to set isInternalPlanner
// to true for the internal executors.
isInternal := ex.executorType == executorTypeInternal || planner.isInternalPlanner
planner.instrumentation.SetIndexRecommendations(ctx, ex.server.idxRecommendationsCache, planner, isInternal)

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/explain_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (e *explainPlanNode) startExec(params runParams) error {
}
}
// Add index recommendations to output, if they exist.
if recs := params.p.instrumentation.indexRecs; recs != nil {
if recs := params.p.instrumentation.explainIndexRecs; recs != nil {
// First add empty row.
rows = append(rows, "")
rows = append(rows, fmt.Sprintf("index recommendations: %d", len(recs)))
Expand Down
38 changes: 22 additions & 16 deletions pkg/sql/instrumentation.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,11 @@ type instrumentationHelper struct {
costEstimate float64

// indexRecs contains index recommendations for the planned statement. It
// will only be populated if the statement is an EXPLAIN statement, or if
// recommendations are requested for the statement for populating the
// statement_statistics table.
// will only be populated if recommendations are requested for the statement
// for populating the statement_statistics table.
indexRecs []indexrec.Rec
// explainIndexRecs contains index recommendations for EXPLAIN statements.
explainIndexRecs []indexrec.Rec

// maxFullScanRows is the maximum number of rows scanned by a full scan, as
// estimated by the optimizer.
Expand Down Expand Up @@ -789,24 +790,29 @@ func (ih *instrumentationHelper) SetIndexRecommendations(
stmtType,
isInternal,
) {
opc := &planner.optPlanningCtx
opc.reset(ctx)
f := opc.optimizer.Factory()
evalCtx := opc.p.EvalContext()
f.Init(ctx, evalCtx, opc.catalog)
f.FoldingControl().AllowStableFolds()
bld := optbuilder.New(ctx, &opc.p.semaCtx, evalCtx, opc.catalog, f, opc.p.stmt.AST)
err := bld.Build()
if err != nil {
log.Warningf(ctx, "unable to build memo: %s", err)
// If the statement is an EXPLAIN, then we might have already generated
// the index recommendations. If so, we can skip generation here.
if ih.explainIndexRecs != nil {
recommendations = ih.explainIndexRecs
} else {
err = opc.makeQueryIndexRecommendation(ctx)
opc := &planner.optPlanningCtx
opc.reset(ctx)
f := opc.optimizer.Factory()
evalCtx := opc.p.EvalContext()
f.Init(ctx, evalCtx, opc.catalog)
f.FoldingControl().AllowStableFolds()
bld := optbuilder.New(ctx, &opc.p.semaCtx, evalCtx, opc.catalog, f, opc.p.stmt.AST)
err := bld.Build()
if err != nil {
log.Warningf(ctx, "unable to generate index recommendations: %s", err)
log.Warningf(ctx, "unable to build memo: %s", err)
} else {
recommendations, err = opc.makeQueryIndexRecommendation(ctx)
if err != nil {
log.Warningf(ctx, "unable to generate index recommendations: %s", err)
}
}
}
reset = true
recommendations = ih.indexRecs
}
ih.indexRecs = idxRec.UpdateIndexRecommendations(
ih.fingerprint,
Expand Down
19 changes: 12 additions & 7 deletions pkg/sql/plan_opt.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,9 +578,11 @@ func (opc *optPlanningCtx) buildExecMemo(ctx context.Context) (_ *memo.Memo, _ e
// find potential index candidates in the memo.
_, isExplain := opc.p.stmt.AST.(*tree.Explain)
if isExplain && p.SessionData().IndexRecommendationsEnabled {
if err := opc.makeQueryIndexRecommendation(ctx); err != nil {
indexRecs, err := opc.makeQueryIndexRecommendation(ctx)
if err != nil {
return nil, err
}
opc.p.instrumentation.explainIndexRecs = indexRecs
}

if _, isCanned := opc.p.stmt.AST.(*tree.CannedOptPlan); !isCanned {
Expand Down Expand Up @@ -721,7 +723,9 @@ func (p *planner) DecodeGist(gist string, external bool) ([]string, error) {
// indexes hypothetically added to the table. An index recommendation for the
// query is outputted based on which hypothetical indexes are helpful in the
// optimal plan.
func (opc *optPlanningCtx) makeQueryIndexRecommendation(ctx context.Context) (err error) {
func (opc *optPlanningCtx) makeQueryIndexRecommendation(
ctx context.Context,
) (_ []indexrec.Rec, err error) {
defer func() {
if r := recover(); r != nil {
// This code allows us to propagate internal errors without having to add
Expand Down Expand Up @@ -757,7 +761,7 @@ func (opc *optPlanningCtx) makeQueryIndexRecommendation(ctx context.Context) (er
return ruleName.IsNormalize()
})
if _, err = opc.optimizer.Optimize(); err != nil {
return err
return nil, err
}

// Walk through the fully normalized memo to determine index candidates and
Expand All @@ -775,12 +779,13 @@ func (opc *optPlanningCtx) makeQueryIndexRecommendation(ctx context.Context) (er
)
opc.optimizer.Memo().Metadata().UpdateTableMeta(f.EvalContext(), hypTables)
if _, err = opc.optimizer.Optimize(); err != nil {
return err
return nil, err
}

opc.p.instrumentation.indexRecs, err = indexrec.FindRecs(ctx, f.Memo().RootExpr(), f.Metadata())
var indexRecs []indexrec.Rec
indexRecs, err = indexrec.FindRecs(ctx, f.Memo().RootExpr(), f.Metadata())
if err != nil {
return err
return nil, err
}

// Re-initialize the optimizer (which also re-initializes the factory) and
Expand All @@ -794,5 +799,5 @@ func (opc *optPlanningCtx) makeQueryIndexRecommendation(ctx context.Context) (er
f.CopyWithoutAssigningPlaceholders,
)

return nil
return indexRecs, nil
}

0 comments on commit 436613f

Please sign in to comment.