From 17e7e5eb0b00366ceb4e7bbe57735b259bc7a2e4 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 10 Dec 2024 15:45:07 -0800 Subject: [PATCH 1/6] cmd/smith: add an option to only generate UDFs This can be helpful when extending PLpgSQL support in sqlsmith. Release note: None --- pkg/cmd/smith/main.go | 5 +++++ pkg/internal/sqlsmith/sqlsmith.go | 10 ++++++++++ 2 files changed, 15 insertions(+) diff --git a/pkg/cmd/smith/main.go b/pkg/cmd/smith/main.go index f33414d2d056..f1a45f10edcc 100644 --- a/pkg/cmd/smith/main.go +++ b/pkg/cmd/smith/main.go @@ -51,6 +51,7 @@ Options: var ( flags = flag.NewFlagSet(os.Args[0], flag.ContinueOnError) expr = flags.Bool("expr", false, "generate expressions instead of statements") + udfs = flags.Bool("udfs", false, "generate only CREATE FUNCTION statements") num = flags.Int("num", 1, "number of statements / expressions to generate") url = flags.String("url", "", "database to fetch schema from") execStmts = flags.Bool("exec-stmts", false, "execute each generated statement against the db specified by url") @@ -195,6 +196,10 @@ func main() { for i := 0; i < *num; i++ { fmt.Print(sep, smither.GenerateExpr(), "\n") } + } else if *udfs { + for i := 0; i < *num; i++ { + fmt.Print(sep, smither.GenerateUDF(), ";\n") + } } else { for i := 0; i < *num; i++ { stmt := smither.Generate() diff --git a/pkg/internal/sqlsmith/sqlsmith.go b/pkg/internal/sqlsmith/sqlsmith.go index 2cc4304f4b89..3ba8093cff33 100644 --- a/pkg/internal/sqlsmith/sqlsmith.go +++ b/pkg/internal/sqlsmith/sqlsmith.go @@ -228,6 +228,16 @@ func (s *Smither) GenerateExpr() tree.TypedExpr { return makeScalar(s, s.randScalarType(), nil) } +// GenerateUDF returns a random CREATE FUNCTION statement. +func (s *Smither) GenerateUDF() tree.Statement { + for { + routine, ok := s.makeCreateFunc() + if ok { + return routine + } + } +} + type nameGenInfo struct { g randident.NameGenerator count int From a3fd8ba5f4eaeab35059517145fc55dcc601187f Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 10 Dec 2024 17:09:47 -0800 Subject: [PATCH 2/6] sqlsmith: extend PLpgSQL support This commit adds support for WHILE and FOR (int) loops as well as SELECT INTO / RETURNING INTO variant of statements. Release note: None --- pkg/internal/sqlsmith/plpgsql.go | 83 +++++++++++++++++++++-- pkg/internal/sqlsmith/relational.go | 100 ++++++++++++++++++---------- 2 files changed, 140 insertions(+), 43 deletions(-) diff --git a/pkg/internal/sqlsmith/plpgsql.go b/pkg/internal/sqlsmith/plpgsql.go index 225cceccbf27..6522849431c5 100644 --- a/pkg/internal/sqlsmith/plpgsql.go +++ b/pkg/internal/sqlsmith/plpgsql.go @@ -39,6 +39,14 @@ func (s *Smither) makePLpgSQLBlock(scope plpgsqlBlockScope) *ast.Block { } } +func (s *Smither) makePLpgSQLVarName(prefix string, scope plpgsqlBlockScope) tree.Name { + varName := s.name(prefix) + for scope.hasVariable(string(varName)) { + varName = s.name(prefix) + } + return varName +} + func (s *Smither) makePLpgSQLDeclarations( scope plpgsqlBlockScope, ) ([]ast.Statement, plpgsqlBlockScope) { @@ -50,10 +58,7 @@ func (s *Smither) makePLpgSQLDeclarations( // TODO(#106368): add support for cursor declarations. decls := make([]ast.Statement, numDecls) for i := 0; i < numDecls; i++ { - varName := s.name("decl") - for newScope.hasVariable(string(varName)) { - varName = s.name("decl") - } + varName := s.makePLpgSQLVarName("decl", newScope) varTyp := s.randType() for varTyp.Identical(types.AnyTuple) || varTyp.Family() == types.CollatedStringFamily { // TODO(#114874): allow record types here when they are supported. @@ -134,6 +139,8 @@ var ( {1, makePLpgSQLBlock}, {2, makePLpgSQLReturn}, {2, makePLpgSQLIf}, + {2, makePLpgSQLWhile}, + {2, makePLpgSQLForLoop}, {5, makePLpgSQLNull}, {10, makePLpgSQLAssign}, {10, makePLpgSQLExecSQL}, @@ -172,13 +179,44 @@ func makePLpgSQLAssign(s *Smither, scope plpgsqlBlockScope) (stmt ast.Statement, } func makePLpgSQLExecSQL(s *Smither, scope plpgsqlBlockScope) (stmt ast.Statement, ok bool) { - // TODO(#106368): add support for SELECT/RETURNING INTO statements. const maxRetries = 5 var sqlStmt tree.Statement for i := 0; i < maxRetries; i++ { - sqlStmt, ok = s.makeSQLStmtForRoutine(scope.vol, scope.refs) + var desiredTypes []*types.T + var targets []ast.Variable + if s.coin() { + // Support INTO syntax. Pick a subset of variables to assign into. + usedVars := make(map[string]struct{}) + numNonConstVars := len(scope.vars) - len(scope.constants) + for len(usedVars) < numNonConstVars { + // Pick non-constant variable that hasn't been used yet. + var varName string + for { + varName = scope.vars[s.rnd.Intn(len(scope.vars))] + if scope.variableIsConstant(varName) { + continue + } + if _, used := usedVars[varName]; used { + continue + } + usedVars[varName] = struct{}{} + desiredTypes = append(desiredTypes, scope.varTypes[varName]) + targets = append(targets, tree.Name(varName)) + break + } + if s.coin() { + break + } + } + } + sqlStmt, ok = s.makeSQLStmtForRoutine(scope.vol, scope.refs, desiredTypes) if ok { - return &ast.Execute{SqlStmt: sqlStmt}, true + return &ast.Execute{ + SqlStmt: sqlStmt, + // Strict option won't matter if targets is empty. + Strict: s.d6() == 1, + Target: targets, + }, true } } return nil, false @@ -188,6 +226,37 @@ func makePLpgSQLNull(_ *Smither, _ plpgsqlBlockScope) (stmt ast.Statement, ok bo return &ast.Null{}, true } +func makePLpgSQLForLoop(s *Smither, scope plpgsqlBlockScope) (stmt ast.Statement, ok bool) { + // TODO(#105246): add support for other query and cursor FOR loops. + control := ast.IntForLoopControl{ + Reverse: s.coin(), + Lower: s.makePLpgSQLExpr(scope, types.Int), + Upper: s.makePLpgSQLExpr(scope, types.Int), + } + if s.coin() { + control.Step = s.makePLpgSQLExpr(scope, types.Int) + } + newScope := scope.makeChild(1 /* numNewVars */) + loopVarName := s.makePLpgSQLVarName("loop", newScope) + newScope.addVariable(string(loopVarName), types.Int, false /* constant */) + const maxLoopStmts = 3 + return &ast.ForLoop{ + // TODO(#106368): optionally add a label. + Target: []ast.Variable{loopVarName}, + Control: &control, + Body: s.makePLpgSQLStatements(newScope, maxLoopStmts), + }, true +} + +func makePLpgSQLWhile(s *Smither, scope plpgsqlBlockScope) (stmt ast.Statement, ok bool) { + const maxLoopStmts = 3 + return &ast.While{ + // TODO(#106368): optionally add a label. + Condition: s.makePLpgSQLCond(scope), + Body: s.makePLpgSQLStatements(scope, maxLoopStmts), + }, true +} + // plpgsqlBlockScope holds the information needed to ensure that generated // statements obey PL/pgSQL syntax and scoping rules. type plpgsqlBlockScope struct { diff --git a/pkg/internal/sqlsmith/relational.go b/pkg/internal/sqlsmith/relational.go index 00bd9537665b..c7f97a6c654e 100644 --- a/pkg/internal/sqlsmith/relational.go +++ b/pkg/internal/sqlsmith/relational.go @@ -1194,7 +1194,7 @@ func (s *Smither) makeRoutineBodySQL( stmts := make([]string, 0, stmtCnt) var stmt tree.Statement for i := 0; i < stmtCnt-1; i++ { - stmt, ok = s.makeSQLStmtForRoutine(vol, refs) + stmt, ok = s.makeSQLStmtForRoutine(vol, refs, nil /* desiredTypes */) if !ok { continue } @@ -1203,45 +1203,57 @@ func (s *Smither) makeRoutineBodySQL( // The return type of the last statement should match the function return // type. // If mutations are enabled, also use anything from mutatingTableExprs -- needs returning + desiredTypes := []*types.T{rTyp} if s.disableMutations || vol != tree.RoutineVolatile || s.coin() { - stmt, lastStmtRefs, ok = s.makeSelect([]*types.T{rTyp}, refs) + stmt, lastStmtRefs, ok = s.makeSelect(desiredTypes, refs) if !ok { return "", nil, false } } else { - var expr tree.TableExpr switch s.d6() { case 1, 2: - expr, lastStmtRefs, ok = s.makeInsertReturning(refs) + stmt, lastStmtRefs, ok = s.makeInsertReturning(desiredTypes, refs) case 3, 4: - expr, lastStmtRefs, ok = s.makeDeleteReturning(refs) + stmt, lastStmtRefs, ok = s.makeDeleteReturning(desiredTypes, refs) case 5, 6: - expr, lastStmtRefs, ok = s.makeUpdateReturning(refs) + stmt, lastStmtRefs, ok = s.makeUpdateReturning(desiredTypes, refs) } if !ok { return "", nil, false } - stmt = expr.(*tree.StatementSource).Statement } stmts = append(stmts, tree.AsStringWithFlags(stmt, tree.FmtParsable)) return "\n" + strings.Join(stmts, ";\n") + "\n", lastStmtRefs, true } func (s *Smither) makeSQLStmtForRoutine( - vol tree.RoutineVolatility, refs colRefs, + vol tree.RoutineVolatility, refs colRefs, desiredTypes []*types.T, ) (stmt tree.Statement, ok bool) { const numRetries = 5 for i := 0; i < numRetries; i++ { if s.disableMutations || vol != tree.RoutineVolatile || s.coin() { - stmt, _, ok = s.makeSelect(nil /* desiredTypes */, refs) + stmt, _, ok = s.makeSelect(desiredTypes, refs) } else { - switch s.d6() { - case 1, 2: - stmt, _, ok = s.makeInsert(refs) - case 3, 4: - stmt, _, ok = s.makeDelete(refs) - case 5, 6: - stmt, _, ok = s.makeUpdate(refs) + if len(desiredTypes) == 0 && s.coin() { + // If the caller didn't request particular result types, in 50% + // cases use the "vanilla" mutation stmts. + switch s.d6() { + case 1, 2: + stmt, _, ok = s.makeInsert(refs) + case 3, 4: + stmt, _, ok = s.makeDelete(refs) + case 5, 6: + stmt, _, ok = s.makeUpdate(refs) + } + } else { + switch s.d6() { + case 1, 2: + stmt, _, ok = s.makeInsertReturning(desiredTypes, refs) + case 3, 4: + stmt, _, ok = s.makeDeleteReturning(desiredTypes, refs) + case 5, 6: + stmt, _, ok = s.makeUpdateReturning(desiredTypes, refs) + } } } if ok { @@ -1309,19 +1321,23 @@ func makeDeleteReturning(s *Smither, refs colRefs, forJoin bool) (tree.TableExpr if forJoin { return nil, nil, false } - return s.makeDeleteReturning(refs) + del, returningRefs, ok := s.makeDeleteReturning(nil /* desiredTypes */, refs) + if !ok { + return nil, nil, false + } + return &tree.StatementSource{Statement: del}, returningRefs, true } -func (s *Smither) makeDeleteReturning(refs colRefs) (tree.TableExpr, colRefs, bool) { +func (s *Smither) makeDeleteReturning( + desiredTypes []*types.T, refs colRefs, +) (*tree.Delete, colRefs, bool) { del, delRef, ok := s.makeDelete(refs) if !ok { return nil, nil, false } var returningRefs colRefs - del.Returning, returningRefs = s.makeReturning(delRef) - return &tree.StatementSource{ - Statement: del, - }, returningRefs, true + del.Returning, returningRefs = s.makeReturning(desiredTypes, delRef) + return del, returningRefs, true } func makeUpdate(s *Smither) (tree.Statement, bool) { @@ -1417,19 +1433,23 @@ func makeUpdateReturning(s *Smither, refs colRefs, forJoin bool) (tree.TableExpr if forJoin { return nil, nil, false } - return s.makeUpdateReturning(refs) + update, returningRefs, ok := s.makeUpdateReturning(nil /* desiredTypes */, refs) + if !ok { + return nil, nil, false + } + return &tree.StatementSource{Statement: update}, returningRefs, true } -func (s *Smither) makeUpdateReturning(refs colRefs) (tree.TableExpr, colRefs, bool) { +func (s *Smither) makeUpdateReturning( + desiredTypes []*types.T, refs colRefs, +) (*tree.Update, colRefs, bool) { update, updateRef, ok := s.makeUpdate(refs) if !ok { return nil, nil, false } var returningRefs colRefs - update.Returning, returningRefs = s.makeReturning(updateRef) - return &tree.StatementSource{ - Statement: update, - }, returningRefs, true + update.Returning, returningRefs = s.makeReturning(desiredTypes, updateRef) + return update, returningRefs, true } func makeInsert(s *Smither) (tree.Statement, bool) { @@ -1590,19 +1610,23 @@ func makeInsertReturning(s *Smither, refs colRefs, forJoin bool) (tree.TableExpr if forJoin { return nil, nil, false } - return s.makeInsertReturning(refs) + insert, returningRefs, ok := s.makeInsertReturning(nil /* desiredTypes */, refs) + if !ok { + return nil, nil, false + } + return &tree.StatementSource{Statement: insert}, returningRefs, true } -func (s *Smither) makeInsertReturning(refs colRefs) (tree.TableExpr, colRefs, bool) { +func (s *Smither) makeInsertReturning( + desiredTypes []*types.T, refs colRefs, +) (*tree.Insert, colRefs, bool) { insert, insertRef, ok := s.makeInsert(refs) if !ok { return nil, nil, false } var returningRefs colRefs - insert.Returning, returningRefs = s.makeReturning([]*tableRef{insertRef}) - return &tree.StatementSource{ - Statement: insert, - }, returningRefs, true + insert.Returning, returningRefs = s.makeReturning(desiredTypes, []*tableRef{insertRef}) + return insert, returningRefs, true } func makeValuesTable(s *Smither, refs colRefs, forJoin bool) (tree.TableExpr, colRefs, bool) { @@ -1800,8 +1824,12 @@ func makeLimit(s *Smither) *tree.Limit { return nil } -func (s *Smither) makeReturning(tables []*tableRef) (*tree.ReturningExprs, colRefs) { - desiredTypes := s.makeDesiredTypes() +func (s *Smither) makeReturning( + desiredTypes []*types.T, tables []*tableRef, +) (*tree.ReturningExprs, colRefs) { + if len(desiredTypes) == 0 { + desiredTypes = s.makeDesiredTypes() + } var refs colRefs for _, table := range tables { From dca5db7793a786039a5ea0a6767ab5d97af89beb Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Tue, 10 Dec 2024 19:07:49 -0800 Subject: [PATCH 3/6] sql: pass session data to getPlanDistribution This will be used in the following commit. The main difficulty of doing so was the fact that we sometimes override DistSQLMode in the pausable portals. This commit temporarily adjusts the session data if we need this while restoring the original state right after the physical planning check. Release note: None --- pkg/sql/apply_join.go | 2 +- pkg/sql/conn_executor_exec.go | 15 +++++++++++---- pkg/sql/distsql_running.go | 4 ++-- pkg/sql/exec_util.go | 6 +++--- pkg/sql/explain_plan.go | 2 +- pkg/sql/explain_vec.go | 2 +- pkg/sql/schema_changer.go | 2 +- 7 files changed, 20 insertions(+), 13 deletions(-) diff --git a/pkg/sql/apply_join.go b/pkg/sql/apply_join.go index 9f6952120429..1550ad7f9080 100644 --- a/pkg/sql/apply_join.go +++ b/pkg/sql/apply_join.go @@ -317,7 +317,7 @@ func runPlanInsidePlan( distributePlan, distSQLProhibitedErr := getPlanDistribution( ctx, plannerCopy.Descriptors().HasUncommittedTypes(), - plannerCopy.SessionData().DistSQLMode, plan.main, &plannerCopy.distSQLVisitor, + plannerCopy.SessionData(), plan.main, &plannerCopy.distSQLVisitor, ) distributeType := DistributionType(LocalDistribution) if distributePlan.WillDistribute() { diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index c24c639e671e..3b8563988681 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -1901,14 +1901,18 @@ func (ex *connExecutor) dispatchToExecutionEngine( ex.sessionTracing.TracePlanCheckStart(ctx) - distSQLMode := ex.sessionData().DistSQLMode + var afterGetPlanDistribution func() if planner.pausablePortal != nil { if len(planner.curPlan.subqueryPlans) == 0 && len(planner.curPlan.cascades) == 0 && len(planner.curPlan.checkPlans) == 0 && len(planner.curPlan.triggers) == 0 { - // We only allow non-distributed plan for pausable portals. - distSQLMode = sessiondatapb.DistSQLOff + // We don't allow a distributed plan for pausable portals. + origDistSQLMode := ex.sessionData().DistSQLMode + ex.sessionData().DistSQLMode = sessiondatapb.DistSQLOff + afterGetPlanDistribution = func() { + ex.sessionData().DistSQLMode = origDistSQLMode + } } else { telemetry.Inc(sqltelemetry.SubOrPostQueryStmtsTriedWithPausablePortals) // We don't allow sub / post queries for pausable portal. Set it back to an @@ -1928,8 +1932,11 @@ func (ex *connExecutor) dispatchToExecutionEngine( } distributePlan, distSQLProhibitedErr := getPlanDistribution( ctx, planner.Descriptors().HasUncommittedTypes(), - distSQLMode, planner.curPlan.main, &planner.distSQLVisitor, + ex.sessionData(), planner.curPlan.main, &planner.distSQLVisitor, ) + if afterGetPlanDistribution != nil { + afterGetPlanDistribution() + } ex.sessionTracing.TracePlanCheckEnd(ctx, nil, distributePlan.WillDistribute()) if ex.server.cfg.TestingKnobs.BeforeExecute != nil { diff --git a/pkg/sql/distsql_running.go b/pkg/sql/distsql_running.go index 11fa9268f81a..5501a945419e 100644 --- a/pkg/sql/distsql_running.go +++ b/pkg/sql/distsql_running.go @@ -1829,7 +1829,7 @@ func (dsp *DistSQLPlanner) planAndRunSubquery( ) error { subqueryDistribution, distSQLProhibitedErr := getPlanDistribution( ctx, planner.Descriptors().HasUncommittedTypes(), - planner.SessionData().DistSQLMode, subqueryPlan.plan, &planner.distSQLVisitor, + planner.SessionData(), subqueryPlan.plan, &planner.distSQLVisitor, ) distribute := DistributionType(LocalDistribution) if subqueryDistribution.WillDistribute() { @@ -2440,7 +2440,7 @@ func (dsp *DistSQLPlanner) planAndRunPostquery( ) error { postqueryDistribution, distSQLProhibitedErr := getPlanDistribution( ctx, planner.Descriptors().HasUncommittedTypes(), - planner.SessionData().DistSQLMode, postqueryPlan, &planner.distSQLVisitor, + planner.SessionData(), postqueryPlan, &planner.distSQLVisitor, ) distribute := DistributionType(LocalDistribution) if postqueryDistribution.WillDistribute() { diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 41b6e0e8fee2..c6844b204f6b 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1932,7 +1932,7 @@ func shouldDistributeGivenRecAndMode( func getPlanDistribution( ctx context.Context, txnHasUncommittedTypes bool, - distSQLMode sessiondatapb.DistSQLExecMode, + sd *sessiondata.SessionData, plan planMaybePhysical, distSQLVisitor *distSQLExprCheckVisitor, ) (_ physicalplan.PlanDistribution, distSQLProhibitedErr error) { @@ -1949,7 +1949,7 @@ func getPlanDistribution( return physicalplan.LocalPlan, nil } - if distSQLMode == sessiondatapb.DistSQLOff { + if sd.DistSQLMode == sessiondatapb.DistSQLOff { return physicalplan.LocalPlan, nil } @@ -1965,7 +1965,7 @@ func getPlanDistribution( return physicalplan.LocalPlan, err } - if shouldDistributeGivenRecAndMode(rec, distSQLMode) { + if shouldDistributeGivenRecAndMode(rec, sd.DistSQLMode) { return physicalplan.FullyDistributedPlan, nil } return physicalplan.LocalPlan, nil diff --git a/pkg/sql/explain_plan.go b/pkg/sql/explain_plan.go index 96f28fcf90f1..185a295aae8b 100644 --- a/pkg/sql/explain_plan.go +++ b/pkg/sql/explain_plan.go @@ -59,7 +59,7 @@ func (e *explainPlanNode) startExec(params runParams) error { // created). distribution, _ := getPlanDistribution( params.ctx, params.p.Descriptors().HasUncommittedTypes(), - params.extendedEvalCtx.SessionData().DistSQLMode, plan.main, ¶ms.p.distSQLVisitor, + params.extendedEvalCtx.SessionData(), plan.main, ¶ms.p.distSQLVisitor, ) outerSubqueries := params.p.curPlan.subqueryPlans diff --git a/pkg/sql/explain_vec.go b/pkg/sql/explain_vec.go index 3b5ccee85ed2..dd638526fdc1 100644 --- a/pkg/sql/explain_vec.go +++ b/pkg/sql/explain_vec.go @@ -38,7 +38,7 @@ func (n *explainVecNode) startExec(params runParams) error { distSQLPlanner := params.extendedEvalCtx.DistSQLPlanner distribution, _ := getPlanDistribution( params.ctx, params.p.Descriptors().HasUncommittedTypes(), - params.extendedEvalCtx.SessionData().DistSQLMode, n.plan.main, ¶ms.p.distSQLVisitor, + params.extendedEvalCtx.SessionData(), n.plan.main, ¶ms.p.distSQLVisitor, ) outerSubqueries := params.p.curPlan.subqueryPlans planCtx := newPlanningCtxForExplainPurposes(distSQLPlanner, params, n.plan.subqueryPlans, distribution) diff --git a/pkg/sql/schema_changer.go b/pkg/sql/schema_changer.go index b57048a05fdd..7f21a107f2f0 100644 --- a/pkg/sql/schema_changer.go +++ b/pkg/sql/schema_changer.go @@ -457,7 +457,7 @@ func (sc *SchemaChanger) backfillQueryIntoTable( planDistribution, _ := getPlanDistribution( ctx, localPlanner.Descriptors().HasUncommittedTypes(), - localPlanner.extendedEvalCtx.SessionData().DistSQLMode, + localPlanner.extendedEvalCtx.SessionData(), localPlanner.curPlan.main, &localPlanner.distSQLVisitor, ) isLocal := !planDistribution.WillDistribute() From 8269527b458c47e64d394790401e978fcd79f479 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 9 Dec 2024 19:21:00 -0800 Subject: [PATCH 4/6] sql: adjust physical planning heuristics around small sorts and aggregations Previously, we had the following hard-coded behavior in the physical planner: presence of any aggregation or sort other than topK would result in `shouldDistribute` recommendation, which would be treated under the default `distsql=auto` mode as "distribute the plan". This can be suboptimal when we need to process a small number of rows (say on the order of 10-100 rows) because the overhead of setting up the distributed plan could start to dominate. This commit makes the heuristic a bit more configurable and - hopefully - smarter: we now will choose to distribute only if aggregation or sort processes "large" number of rows (1000 rows is the threshold by default to be considered "large"), and if we happen to get a "small" set of rows for these operations, we now will use `canDistribute` recommendation. This required plumbing of the estimated number of rows that an operator will read from its input, from the optimizer. In case when we don't have stats available we will fall back to choosing `shouldDistribute` option: the thinking here is that not distributing when we should (think large OLAP query) has higher penalty vs distributing when we shouldn't (think small OLTP query), also it resembles the behavior before this change. Release note (sql change): DistSQL physical planning decisions under `distsql=auto` mode have been adjusted in the following manner: presence of the aggregation and the general sort no longer forces the plan to be distributed. Namely, we might not choose to distribute the plan if we expect to process small number of rows by these operations (less than 1000 by default, configurable via `distribute_group_by_row_count_threshold` and `distribute_sort_row_count_threshold` session variables for aggregations and sorts, respectively). --- pkg/sql/distsql_physical_planner.go | 61 +++++++++++-------- pkg/sql/distsql_spec_exec_factory.go | 21 ++++++- pkg/sql/exec_factory_util.go | 2 +- pkg/sql/exec_util.go | 10 ++- pkg/sql/group.go | 4 ++ .../testdata/logic_test/information_schema | 2 + .../logictest/testdata/logic_test/pg_catalog | 6 ++ .../logictest/testdata/logic_test/show_source | 2 + pkg/sql/opt/exec/execbuilder/relational.go | 21 ++++++- .../execbuilder/testdata/distsql_auto_mode | 54 ++++++++++++++-- pkg/sql/opt/exec/factory.opt | 12 ++++ pkg/sql/opt_exec_factory.go | 42 +++++++------ .../local_only_session_data.proto | 8 +++ pkg/sql/sort.go | 3 + pkg/sql/vars.go | 46 ++++++++++++++ 15 files changed, 241 insertions(+), 53 deletions(-) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 09c5be0e62be..15936fea7512 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -46,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/span" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" @@ -511,7 +512,7 @@ func mustWrapValuesNode(planCtx *PlanningCtx, specifiedInQuery bool) bool { // this plan couldn't be distributed. // TODO(radu): add tests for this. func checkSupportForPlanNode( - node planNode, distSQLVisitor *distSQLExprCheckVisitor, + node planNode, distSQLVisitor *distSQLExprCheckVisitor, sd *sessiondata.SessionData, ) (distRecommendation, error) { switch n := node.(type) { // Keep these cases alphabetized, please! @@ -522,19 +523,19 @@ func checkSupportForPlanNode( return shouldDistribute, nil case *distinctNode: - return checkSupportForPlanNode(n.plan, distSQLVisitor) + return checkSupportForPlanNode(n.plan, distSQLVisitor, sd) case *exportNode: - return checkSupportForPlanNode(n.source, distSQLVisitor) + return checkSupportForPlanNode(n.source, distSQLVisitor, sd) case *filterNode: if err := checkExprForDistSQL(n.filter, distSQLVisitor); err != nil { return cannotDistribute, err } - return checkSupportForPlanNode(n.source.plan, distSQLVisitor) + return checkSupportForPlanNode(n.source.plan, distSQLVisitor, sd) case *groupNode: - rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor) + rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -544,7 +545,13 @@ func checkSupportForPlanNode( } } // Distribute aggregations if possible. - return rec.compose(shouldDistribute), nil + aggRec := shouldDistribute + if n.estimatedInputRowCount != 0 && sd.DistributeGroupByRowCountThreshold > n.estimatedInputRowCount { + // Don't force distribution if we expect to process small number of + // rows. + aggRec = canDistribute + } + return rec.compose(aggRec), nil case *indexJoinNode: if n.table.lockingStrength != descpb.ScanLockingStrength_FOR_NONE { @@ -556,13 +563,13 @@ func checkSupportForPlanNode( } // n.table doesn't have meaningful spans, but we need to check support (e.g. // for any filtering expression). - if _, err := checkSupportForPlanNode(n.table, distSQLVisitor); err != nil { + if _, err := checkSupportForPlanNode(n.table, distSQLVisitor, sd); err != nil { return cannotDistribute, err } - return checkSupportForPlanNode(n.input, distSQLVisitor) + return checkSupportForPlanNode(n.input, distSQLVisitor, sd) case *invertedFilterNode: - return checkSupportForInvertedFilterNode(n, distSQLVisitor) + return checkSupportForInvertedFilterNode(n, distSQLVisitor, sd) case *invertedJoinNode: if n.table.lockingStrength != descpb.ScanLockingStrength_FOR_NONE { @@ -575,7 +582,7 @@ func checkSupportForPlanNode( if err := checkExprForDistSQL(n.onExpr, distSQLVisitor); err != nil { return cannotDistribute, err } - rec, err := checkSupportForPlanNode(n.input, distSQLVisitor) + rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -585,11 +592,11 @@ func checkSupportForPlanNode( if err := checkExprForDistSQL(n.pred.onCond, distSQLVisitor); err != nil { return cannotDistribute, err } - recLeft, err := checkSupportForPlanNode(n.left.plan, distSQLVisitor) + recLeft, err := checkSupportForPlanNode(n.left.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } - recRight, err := checkSupportForPlanNode(n.right.plan, distSQLVisitor) + recRight, err := checkSupportForPlanNode(n.right.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -606,7 +613,7 @@ func checkSupportForPlanNode( // Note that we don't need to check whether we support distribution of // n.countExpr or n.offsetExpr because those expressions are evaluated // locally, during the physical planning. - return checkSupportForPlanNode(n.plan, distSQLVisitor) + return checkSupportForPlanNode(n.plan, distSQLVisitor, sd) case *lookupJoinNode: if n.remoteLookupExpr != nil || n.remoteOnlyLookups { @@ -630,7 +637,7 @@ func checkSupportForPlanNode( if err := checkExprForDistSQL(n.onCond, distSQLVisitor); err != nil { return cannotDistribute, err } - rec, err := checkSupportForPlanNode(n.input, distSQLVisitor) + rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -647,7 +654,7 @@ func checkSupportForPlanNode( return cannotDistribute, err } } - return checkSupportForPlanNode(n.source, distSQLVisitor) + return checkSupportForPlanNode(n.source, distSQLVisitor, sd) case *renderNode: for _, e := range n.render { @@ -655,7 +662,7 @@ func checkSupportForPlanNode( return cannotDistribute, err } } - return checkSupportForPlanNode(n.source.plan, distSQLVisitor) + return checkSupportForPlanNode(n.source.plan, distSQLVisitor, sd) case *scanNode: if n.lockingStrength != descpb.ScanLockingStrength_FOR_NONE { @@ -684,14 +691,20 @@ func checkSupportForPlanNode( } case *sortNode: - rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor) + rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } - return rec.compose(shouldDistribute), nil + sortRec := shouldDistribute + if n.estimatedInputRowCount != 0 && sd.DistributeSortRowCountThreshold > n.estimatedInputRowCount { + // Don't force distribution if we expect to process small number of + // rows. + sortRec = canDistribute + } + return rec.compose(sortRec), nil case *topKNode: - rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor) + rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -702,11 +715,11 @@ func checkSupportForPlanNode( return canDistribute, nil case *unionNode: - recLeft, err := checkSupportForPlanNode(n.left, distSQLVisitor) + recLeft, err := checkSupportForPlanNode(n.left, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } - recRight, err := checkSupportForPlanNode(n.right, distSQLVisitor) + recRight, err := checkSupportForPlanNode(n.right, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -730,7 +743,7 @@ func checkSupportForPlanNode( return canDistribute, nil case *windowNode: - rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor) + rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } @@ -769,9 +782,9 @@ func checkSupportForPlanNode( } func checkSupportForInvertedFilterNode( - n *invertedFilterNode, distSQLVisitor *distSQLExprCheckVisitor, + n *invertedFilterNode, distSQLVisitor *distSQLExprCheckVisitor, sd *sessiondata.SessionData, ) (distRecommendation, error) { - rec, err := checkSupportForPlanNode(n.input, distSQLVisitor) + rec, err := checkSupportForPlanNode(n.input, distSQLVisitor, sd) if err != nil { return cannotDistribute, err } diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 56221f2cc602..975d81b24173 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -499,10 +499,17 @@ func (e *distSQLSpecExecFactory) constructAggregators( reqOrdering exec.OutputOrdering, isScalar bool, estimatedRowCount uint64, + estimatedInputRowCount uint64, ) (exec.Node, error) { physPlan, plan := getPhysPlan(input) // planAggregators() itself decides whether to distribute the aggregation. - planCtx := e.getPlanCtx(shouldDistribute) + aggRec := shouldDistribute + if estimatedInputRowCount != 0 && e.planner.SessionData().DistributeGroupByRowCountThreshold > estimatedInputRowCount { + // Don't force distribution if we expect to process small number of + // rows. + aggRec = canDistribute + } + planCtx := e.getPlanCtx(aggRec) aggregationSpecs := make([]execinfrapb.AggregatorSpec_Aggregation, len(groupCols)+len(aggregations)) argumentsColumnTypes := make([][]*types.T, len(groupCols)+len(aggregations)) var err error @@ -562,6 +569,7 @@ func (e *distSQLSpecExecFactory) ConstructGroupBy( reqOrdering exec.OutputOrdering, groupingOrderType exec.GroupingOrderType, estimatedRowCount uint64, + estimatedInputRowCount uint64, ) (exec.Node, error) { return e.constructAggregators( input, @@ -571,11 +579,12 @@ func (e *distSQLSpecExecFactory) ConstructGroupBy( reqOrdering, false, /* isScalar */ estimatedRowCount, + estimatedInputRowCount, ) } func (e *distSQLSpecExecFactory) ConstructScalarGroupBy( - input exec.Node, aggregations []exec.AggInfo, + input exec.Node, aggregations []exec.AggInfo, estimatedInputRowCount uint64, ) (exec.Node, error) { return e.constructAggregators( input, @@ -585,6 +594,7 @@ func (e *distSQLSpecExecFactory) ConstructScalarGroupBy( exec.OutputOrdering{}, /* reqOrdering */ true, /* isScalar */ 1, /* estimatedRowCount */ + estimatedInputRowCount, ) } @@ -635,9 +645,14 @@ func (e *distSQLSpecExecFactory) ConstructUnionAll( } func (e *distSQLSpecExecFactory) ConstructSort( - input exec.Node, ordering exec.OutputOrdering, alreadyOrderedPrefix int, + input exec.Node, + ordering exec.OutputOrdering, + alreadyOrderedPrefix int, + estimatedInputRowCount uint64, ) (exec.Node, error) { physPlan, plan := getPhysPlan(input) + // TODO(yuzefovich): add better heuristics here so that we always distribute + // "large" sorts, as controlled by a session variable. e.dsp.addSorters(e.ctx, physPlan, colinfo.ColumnOrdering(ordering), alreadyOrderedPrefix, 0 /* limit */) // Since addition of sorters doesn't change any properties of the physical // plan, we don't need to update any of those. diff --git a/pkg/sql/exec_factory_util.go b/pkg/sql/exec_factory_util.go index cc97962c690b..7745722edc04 100644 --- a/pkg/sql/exec_factory_util.go +++ b/pkg/sql/exec_factory_util.go @@ -301,7 +301,7 @@ func constructVirtualScan( // Virtual indexes never provide a legitimate ordering, so we have to make // sure to sort if we have a required ordering. if len(reqOrdering) != 0 { - n, err = ef.ConstructSort(n, reqOrdering, 0) + n, err = ef.ConstructSort(n, reqOrdering, 0 /* alreadyOrderedPrefix */, 0 /* estimatedInputRowCount */) if err != nil { return nil, err } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index c6844b204f6b..9955ed588d11 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1958,7 +1958,7 @@ func getPlanDistribution( return physicalplan.LocalPlan, nil } - rec, err := checkSupportForPlanNode(plan.planNode, distSQLVisitor) + rec, err := checkSupportForPlanNode(plan.planNode, distSQLVisitor, sd) if err != nil { // Don't use distSQL for this request. log.VEventf(ctx, 1, "query not supported for distSQL: %s", err) @@ -3312,6 +3312,14 @@ func (m *sessionDataMutator) SetPartiallyDistributedPlansDisabled(val bool) { m.data.PartiallyDistributedPlansDisabled = val } +func (m *sessionDataMutator) SetDistributeGroupByRowCountThreshold(val uint64) { + m.data.DistributeGroupByRowCountThreshold = val +} + +func (m *sessionDataMutator) SetDistributeSortRowCountThreshold(val uint64) { + m.data.DistributeSortRowCountThreshold = val +} + func (m *sessionDataMutator) SetDisableVecUnionEagerCancellation(val bool) { m.data.DisableVecUnionEagerCancellation = val } diff --git a/pkg/sql/group.go b/pkg/sql/group.go index d018a420d923..20d93ab28c7c 100644 --- a/pkg/sql/group.go +++ b/pkg/sql/group.go @@ -41,6 +41,10 @@ type groupNode struct { // estimatedRowCount, when set, is the estimated number of rows that this // groupNode will output. estimatedRowCount uint64 + + // estimatedInputRowCount, when set, is the estimated number of rows that + // this groupNode will read from its input. + estimatedInputRowCount uint64 } func (n *groupNode) startExec(params runParams) error { diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index 374ce8df6309..934a6c78de1f 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -3927,6 +3927,8 @@ disable_partially_distributed_plans off disable_plan_gists off disable_vec_union_eager_cancellation off disallow_full_table_scans off +distribute_group_by_row_count_threshold 1000 +distribute_sort_row_count_threshold 1000 distsql_plan_gateway_bias 2 enable_auto_rehoming off enable_create_stats_using_extremes on diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index 7a48d9ebddbe..7979b751a46d 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -2935,6 +2935,8 @@ disable_partially_distributed_plans off N disable_plan_gists off NULL NULL NULL string disable_vec_union_eager_cancellation off NULL NULL NULL string disallow_full_table_scans off NULL NULL NULL string +distribute_group_by_row_count_threshold 1000 NULL NULL NULL string +distribute_sort_row_count_threshold 1000 NULL NULL NULL string distsql off NULL NULL NULL string distsql_plan_gateway_bias 2 NULL NULL NULL string enable_auto_rehoming off NULL NULL NULL string @@ -3132,6 +3134,8 @@ disable_partially_distributed_plans off N disable_plan_gists off NULL user NULL off off disable_vec_union_eager_cancellation off NULL user NULL off off disallow_full_table_scans off NULL user NULL off off +distribute_group_by_row_count_threshold 1000 NULL user NULL 1000 1000 +distribute_sort_row_count_threshold 1000 NULL user NULL 1000 1000 distsql off NULL user NULL off off distsql_plan_gateway_bias 2 NULL user NULL 2 2 enable_auto_rehoming off NULL user NULL off off @@ -3325,6 +3329,8 @@ disable_partially_distributed_plans NULL NULL NULL disable_plan_gists NULL NULL NULL NULL NULL disable_vec_union_eager_cancellation NULL NULL NULL NULL NULL disallow_full_table_scans NULL NULL NULL NULL NULL +distribute_group_by_row_count_threshold NULL NULL NULL NULL NULL +distribute_sort_row_count_threshold NULL NULL NULL NULL NULL distsql NULL NULL NULL NULL NULL distsql_plan_gateway_bias NULL NULL NULL NULL NULL distsql_workmem NULL NULL NULL NULL NULL diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index 0a470adeac68..75c01a0e0352 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -68,6 +68,8 @@ disable_partially_distributed_plans off disable_plan_gists off disable_vec_union_eager_cancellation off disallow_full_table_scans off +distribute_group_by_row_count_threshold 1000 +distribute_sort_row_count_threshold 1000 distsql off distsql_plan_gateway_bias 2 enable_auto_rehoming off diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index 957db727783e..79289155dfc1 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -1704,7 +1704,12 @@ func (b *Builder) buildGroupBy(groupBy memo.RelExpr) (_ execPlan, outputCols col var ep execPlan if groupBy.Op() == opt.ScalarGroupByOp { - ep.root, err = b.factory.ConstructScalarGroupBy(input.root, aggInfos) + scalarGroupBy := groupBy.(*memo.ScalarGroupByExpr) + var inputRowCount uint64 + if inputRelProps := scalarGroupBy.Input.Relational(); inputRelProps.Statistics().Available { + inputRowCount = uint64(math.Ceil(inputRelProps.Statistics().RowCount)) + } + ep.root, err = b.factory.ConstructScalarGroupBy(input.root, aggInfos, inputRowCount) } else { groupBy := groupBy.(*memo.GroupByExpr) var groupingColOrder colinfo.ColumnOrdering @@ -1720,12 +1725,15 @@ func (b *Builder) buildGroupBy(groupBy memo.RelExpr) (_ execPlan, outputCols col return execPlan{}, colOrdMap{}, err } orderType := exec.GroupingOrderType(groupBy.GroupingOrderType(&groupBy.RequiredPhysical().Ordering)) - var rowCount uint64 + var rowCount, inputRowCount uint64 if relProps := groupBy.Relational(); relProps.Statistics().Available { rowCount = uint64(math.Ceil(relProps.Statistics().RowCount)) } + if inputRelProps := groupBy.Input.Relational(); inputRelProps.Statistics().Available { + inputRowCount = uint64(math.Ceil(inputRelProps.Statistics().RowCount)) + } ep.root, err = b.factory.ConstructGroupBy( - input.root, groupingColIdx, groupingColOrder, aggInfos, reqOrd, orderType, rowCount, + input.root, groupingColIdx, groupingColOrder, aggInfos, reqOrd, orderType, rowCount, inputRowCount, ) } if err != nil { @@ -2079,11 +2087,18 @@ func (b *Builder) buildSort(sort *memo.SortExpr) (_ execPlan, outputCols colOrdM if err != nil { return execPlan{}, colOrdMap{}, err } + + var inputRowCount uint64 + if inputRelProps := sort.Input.Relational(); inputRelProps.Statistics().Available { + inputRowCount = uint64(math.Ceil(inputRelProps.Statistics().RowCount)) + } + var ep execPlan ep.root, err = b.factory.ConstructSort( input.root, exec.OutputOrdering(sqlOrdering), alreadyOrderedPrefix, + inputRowCount, ) if err != nil { return execPlan{}, colOrdMap{}, err diff --git a/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode b/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode index 13f7a5d15136..9c2c112bba21 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode +++ b/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode @@ -15,14 +15,22 @@ CREATE TABLE kv (k INT PRIMARY KEY, v INT); ALTER TABLE kv SPLIT AT SELECT i FROM generate_series(1,5) AS g(i); ALTER TABLE kv EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_series(1, 5) as g(i); +statement ok +ALTER TABLE kv INJECT STATISTICS '[ + { + "columns": ["k"], + "created_at": "2024-01-01 1:00:00.00000+00:00", + "row_count": 5000, + "distinct_count": 5000 + } +]'; + # Verify the JSON variant. query T EXPLAIN (DISTSQL, JSON) SELECT 1 ---- {"sql":"EXPLAIN (DISTSQL, JSON) SELECT 1","nodeNames":["1"],"processors":[{"nodeIdx":0,"inputs":[],"core":{"title":"local values 0/0","details":[]},"outputs":[],"stage":1,"processorID":0},{"nodeIdx":0,"inputs":[],"core":{"title":"Response","details":[]},"outputs":[],"stage":0,"processorID":-1}],"edges":[{"sourceProc":0,"sourceOutput":0,"destProc":1,"destInput":0,"streamID":0}],"flow_id":"00000000-0000-0000-0000-000000000000","flags":{"ShowInputTypes":false,"MakeDeterministic":true}} - - # Full table scan - distribute. query T SELECT info FROM [EXPLAIN SELECT * FROM kv] WHERE info LIKE 'distribution%' @@ -47,18 +55,56 @@ SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 AND v=1] WHERE info LIKE 'd ---- distribution: local -# Sort - distribute. +# Sort of a large set of rows - distribute. query T SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v] WHERE info LIKE 'distribution%' ---- distribution: full -# Aggregation - distribute. +# Now consider the same set of rows small. +statement ok +SET distribute_sort_row_count_threshold = 10000; + +# Sort of a small table - don't distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v] WHERE info LIKE 'distribution%' +---- +distribution: local + +statement ok +RESET distribute_sort_row_count_threshold; + +# Aggregation over a large set of rows - distribute. query T SELECT info FROM [EXPLAIN SELECT k, sum(v) FROM kv WHERE k>1 GROUP BY k] WHERE info LIKE 'distribution%' ---- distribution: full +# Scalar aggregation over a large set of rows - distribute. +query T +SELECT info FROM [EXPLAIN SELECT sum(v) FROM kv WHERE k>1] WHERE info LIKE 'distribution%' +---- +distribution: full + +# Now consider the same set of rows small. +statement ok +SET distribute_group_by_row_count_threshold = 10000; + +# Aggregation over a small set of rows - don't distribute. +query T +SELECT info FROM [EXPLAIN SELECT k, sum(v) FROM kv WHERE k>1 GROUP BY k] WHERE info LIKE 'distribution%' +---- +distribution: local + +# Scalar aggregation over a small set of rows - don't distribute. +query T +SELECT info FROM [EXPLAIN SELECT sum(v) FROM kv WHERE k>1] WHERE info LIKE 'distribution%' +---- +distribution: local + +statement ok +RESET distribute_group_by_row_count_threshold; + # Hard limit in scan - don't distribute. query T SELECT info FROM [EXPLAIN SELECT * FROM kv LIMIT 1] WHERE info LIKE 'distribution%' diff --git a/pkg/sql/opt/exec/factory.opt b/pkg/sql/opt/exec/factory.opt index d49720933175..6a7e511f4087 100644 --- a/pkg/sql/opt/exec/factory.opt +++ b/pkg/sql/opt/exec/factory.opt @@ -146,6 +146,10 @@ define GroupBy { # If set, the estimated number of rows that this GroupBy will output # (rounded up). estimatedRowCount uint64 + + # If set, the estimated number of rows that this GroupBy will read from its + # input (rounded up). + estimatedInputRowCount uint64 } # ScalarGroupBy runs a scalar aggregation, i.e. one which performs a set of @@ -155,6 +159,10 @@ define GroupBy { define ScalarGroupBy { Input exec.Node Aggregations []exec.AggInfo + + # If set, the estimated number of rows that this GroupBy will read from its + # input (rounded up). + estimatedInputRowCount uint64 } # Distinct filters out rows such that only the first row is kept for each set of @@ -241,6 +249,10 @@ define Sort { Input exec.Node Ordering exec.OutputOrdering AlreadyOrderedPrefix int + + # If set, the estimated number of rows that this sorter will read from its + # input (rounded up). + estimatedInputRowCount uint64 } # Ordinality appends an ordinality column to each row in the input node. diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 53a044b100f7..4debe8b2d988 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -491,7 +491,7 @@ func (ef *execFactory) ConstructMergeJoin( // ConstructScalarGroupBy is part of the exec.Factory interface. func (ef *execFactory) ConstructScalarGroupBy( - input exec.Node, aggregations []exec.AggInfo, + input exec.Node, aggregations []exec.AggInfo, estimatedInputRowCount uint64, ) (exec.Node, error) { // There are no grouping columns with scalar GroupBy, so we create empty // arguments upfront to be passed into getResultColumnsForGroupBy call @@ -499,10 +499,12 @@ func (ef *execFactory) ConstructScalarGroupBy( var inputCols colinfo.ResultColumns var groupCols []exec.NodeColumnOrdinal n := &groupNode{ - plan: input.(planNode), - funcs: make([]*aggregateFuncHolder, 0, len(aggregations)), - columns: getResultColumnsForGroupBy(inputCols, groupCols, aggregations), - isScalar: true, + plan: input.(planNode), + funcs: make([]*aggregateFuncHolder, 0, len(aggregations)), + columns: getResultColumnsForGroupBy(inputCols, groupCols, aggregations), + isScalar: true, + estimatedRowCount: 1, + estimatedInputRowCount: estimatedInputRowCount, } if err := ef.addAggregations(n, aggregations); err != nil { return nil, err @@ -519,20 +521,22 @@ func (ef *execFactory) ConstructGroupBy( reqOrdering exec.OutputOrdering, groupingOrderType exec.GroupingOrderType, estimatedRowCount uint64, + estimatedInputRowCount uint64, ) (exec.Node, error) { inputPlan := input.(planNode) inputCols := planColumns(inputPlan) // TODO(harding): Use groupingOrder to determine when to use a hash // aggregator. n := &groupNode{ - plan: inputPlan, - funcs: make([]*aggregateFuncHolder, 0, len(groupCols)+len(aggregations)), - columns: getResultColumnsForGroupBy(inputCols, groupCols, aggregations), - groupCols: groupCols, - groupColOrdering: groupColOrdering, - isScalar: false, - reqOrdering: ReqOrdering(reqOrdering), - estimatedRowCount: estimatedRowCount, + plan: inputPlan, + funcs: make([]*aggregateFuncHolder, 0, len(groupCols)+len(aggregations)), + columns: getResultColumnsForGroupBy(inputCols, groupCols, aggregations), + groupCols: groupCols, + groupColOrdering: groupColOrdering, + isScalar: false, + reqOrdering: ReqOrdering(reqOrdering), + estimatedRowCount: estimatedRowCount, + estimatedInputRowCount: estimatedInputRowCount, } for _, col := range n.groupCols { // TODO(radu): only generate the grouping columns we actually need. @@ -635,12 +639,16 @@ func (ef *execFactory) ConstructUnionAll( // ConstructSort is part of the exec.Factory interface. func (ef *execFactory) ConstructSort( - input exec.Node, ordering exec.OutputOrdering, alreadyOrderedPrefix int, + input exec.Node, + ordering exec.OutputOrdering, + alreadyOrderedPrefix int, + estimatedInputRowCount uint64, ) (exec.Node, error) { return &sortNode{ - plan: input.(planNode), - ordering: colinfo.ColumnOrdering(ordering), - alreadyOrderedPrefix: alreadyOrderedPrefix, + plan: input.(planNode), + ordering: colinfo.ColumnOrdering(ordering), + alreadyOrderedPrefix: alreadyOrderedPrefix, + estimatedInputRowCount: estimatedInputRowCount, }, nil } diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto index b201a1d211b8..250b56350bd3 100644 --- a/pkg/sql/sessiondatapb/local_only_session_data.proto +++ b/pkg/sql/sessiondatapb/local_only_session_data.proto @@ -563,6 +563,14 @@ message LocalOnlySessionData { // RecursionDepthLimit is the maximum depth that nested trigger-function calls // can reach. int64 recursion_depth_limit = 144; + // DistributeGroupByRowCountThreshold is the minimum number of rows estimated + // to be processed by the GroupBy operator so that we choose to distribute the + // plan because of this aggregator stage of DistSQL processors. + uint64 distribute_group_by_row_count_threshold = 145; + // DistributeSortRowCountThreshold is the minimum number of rows estimated + // to be processed by the Sort operator so that we choose to distribute the + // plan because of this sorter stage of DistSQL processors. + uint64 distribute_sort_row_count_threshold = 146; /////////////////////////////////////////////////////////////////////////// // WARNING: consider whether a session parameter you're adding needs to // diff --git a/pkg/sql/sort.go b/pkg/sql/sort.go index 121f29c92287..807f96f418eb 100644 --- a/pkg/sql/sort.go +++ b/pkg/sql/sort.go @@ -20,6 +20,9 @@ type sortNode struct { // When alreadyOrderedPrefix is non-zero, the input is already ordered on // the prefix ordering[:alreadyOrderedPrefix]. alreadyOrderedPrefix int + // estimatedInputRowCount, when set, is the estimated number of rows that + // this sortNode will read from its input. + estimatedInputRowCount uint64 } func (n *sortNode) startExec(runParams) error { diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index d9b9ca630873..d20b33725412 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -632,6 +632,52 @@ var varGen = map[string]sessionVar{ GlobalDefault: globalFalse, }, + // CockroachDB extension. + `distribute_group_by_row_count_threshold`: { + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return strconv.FormatUint(evalCtx.SessionData().DistributeGroupByRowCountThreshold, 10), nil + }, + GetStringVal: makeIntGetStringValFn(`distribute_group_by_row_count_threshold`), + Set: func(_ context.Context, m sessionDataMutator, s string) error { + i, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + if i < 0 { + return pgerror.Newf(pgcode.InvalidParameterValue, + "cannot set distribute_group_by_row_count_threshold to a negative value: %d", i) + } + m.SetDistributeGroupByRowCountThreshold(uint64(i)) + return nil + }, + GlobalDefault: func(sv *settings.Values) string { + return strconv.FormatUint(1000, 10) + }, + }, + + // CockroachDB extension. + `distribute_sort_row_count_threshold`: { + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return strconv.FormatUint(evalCtx.SessionData().DistributeSortRowCountThreshold, 10), nil + }, + GetStringVal: makeIntGetStringValFn(`distribute_sort_row_count_threshold`), + Set: func(_ context.Context, m sessionDataMutator, s string) error { + i, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + if i < 0 { + return pgerror.Newf(pgcode.InvalidParameterValue, + "cannot set distribute_sort_row_count_threshold to a negative value: %d", i) + } + m.SetDistributeSortRowCountThreshold(uint64(i)) + return nil + }, + GlobalDefault: func(sv *settings.Values) string { + return strconv.FormatUint(1000, 10) + }, + }, + // CockroachDB extension. `disable_vec_union_eager_cancellation`: { GetStringVal: makePostgresBoolGetStringValFn(`disable_vec_union_eager_cancellation`), From 9d69824627d8f8e9dc01844f127703c445bf1b83 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 9 Dec 2024 22:06:34 -0800 Subject: [PATCH 5/6] sql: distribute large top K sorts This commit unifies the physical planning decision of the top K sort with the general sort. Previously, we always said that we "could distribute" the top K sort, but if we happened to perform the top K sort over a large set of rows, this could be suboptimal. This commit introduces the same heuristic for the top K sort as the previous one did for the general sort: if we expect to sort at least 1000 rows by default, then we now will choose to distribute the plan. Release note (sql change): DistSQL physical planning decisions under `distsql=auto` mode have been adjusted in the following manner: the top K sort over a large set of rows (1000 by default, controlled via `distribute_sort_row_count_threshold` session variable) will now force the plan distribution. --- pkg/sql/distsql_physical_planner.go | 9 +++++-- pkg/sql/distsql_spec_exec_factory.go | 9 +++++-- pkg/sql/opt/exec/execbuilder/relational.go | 8 ++++++- .../execbuilder/testdata/distsql_auto_mode | 24 +++++++++---------- pkg/sql/opt/exec/factory.opt | 4 ++++ pkg/sql/opt_exec_factory.go | 17 ++++++++----- pkg/sql/topk.go | 3 +++ 7 files changed, 51 insertions(+), 23 deletions(-) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 15936fea7512..f98fcb232059 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -708,8 +708,13 @@ func checkSupportForPlanNode( if err != nil { return cannotDistribute, err } - // If we have a top K sort, we can distribute the query. - return rec.compose(canDistribute), nil + topKRec := shouldDistribute + if n.estimatedInputRowCount != 0 && sd.DistributeSortRowCountThreshold > n.estimatedInputRowCount { + // Don't force distribution if we expect to process small number of + // rows. + topKRec = canDistribute + } + return rec.compose(topKRec), nil case *unaryNode: return canDistribute, nil diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index 975d81b24173..db97c0c34d96 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -829,13 +829,18 @@ func (e *distSQLSpecExecFactory) ConstructLimit( } func (e *distSQLSpecExecFactory) ConstructTopK( - input exec.Node, k int64, ordering exec.OutputOrdering, alreadyOrderedPrefix int, + input exec.Node, + k int64, + ordering exec.OutputOrdering, + alreadyOrderedPrefix int, + estimatedInputRowCount uint64, ) (exec.Node, error) { physPlan, plan := getPhysPlan(input) if k <= 0 { return nil, errors.New("negative or zero value for LIMIT") } - // No already ordered prefix. + // TODO(yuzefovich): add better heuristics here so that we always distribute + // "large" sorts, as controlled by a session variable. e.dsp.addSorters(e.ctx, physPlan, colinfo.ColumnOrdering(ordering), alreadyOrderedPrefix, k) // Since addition of topk doesn't change any properties of // the physical plan, we don't need to update any of those. diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index 79289155dfc1..8b4963fc8e79 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -2028,12 +2028,18 @@ func (b *Builder) buildTopK(e *memo.TopKExpr) (_ execPlan, outputCols colOrdMap, if err != nil { return execPlan{}, colOrdMap{}, err } + var inputRowCount uint64 + if inputRelProps := e.Input.Relational(); inputRelProps.Statistics().Available { + inputRowCount = uint64(math.Ceil(inputRelProps.Statistics().RowCount)) + } var ep execPlan ep.root, err = b.factory.ConstructTopK( input.root, e.K, exec.OutputOrdering(sqlOrdering), - alreadyOrderedPrefix) + alreadyOrderedPrefix, + inputRowCount, + ) if err != nil { return execPlan{}, colOrdMap{}, err } diff --git a/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode b/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode index 9c2c112bba21..feb1af9df006 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode +++ b/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode @@ -61,6 +61,12 @@ SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v] WHERE info LIKE ---- distribution: full +# Top K over a large set of rows - distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v LIMIT 1] WHERE info LIKE 'distribution%' +---- +distribution: full + # Now consider the same set of rows small. statement ok SET distribute_sort_row_count_threshold = 10000; @@ -71,6 +77,12 @@ SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v] WHERE info LIKE ---- distribution: local +# Top K over a small set of rows - distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v LIMIT 1] WHERE info LIKE 'distribution%' +---- +distribution: local + statement ok RESET distribute_sort_row_count_threshold; @@ -118,18 +130,6 @@ SELECT info FROM [EXPLAIN SELECT * FROM kv UNION SELECT * FROM kv LIMIT 1] WHERE ---- distribution: full -# Limit after sort (i.e. top K sort) - don't distribute. -query T -SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v LIMIT 1] WHERE info LIKE 'distribution%' ----- -distribution: local - -# General sort - distribute. -query T -SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 ORDER BY v] WHERE info LIKE 'distribution%' ----- -distribution: full - # Limit after aggregation - distribute. query T SELECT info FROM [EXPLAIN SELECT k, sum(v) FROM kv WHERE k>1 GROUP BY k LIMIT 1] WHERE info LIKE 'distribution%' diff --git a/pkg/sql/opt/exec/factory.opt b/pkg/sql/opt/exec/factory.opt index 6a7e511f4087..0c99ee15df37 100644 --- a/pkg/sql/opt/exec/factory.opt +++ b/pkg/sql/opt/exec/factory.opt @@ -402,6 +402,10 @@ define TopK { K int64 Ordering exec.OutputOrdering AlreadyOrderedPrefix int + + # If set, the estimated number of rows that this TopK sorter will read from + # its input (rounded up). + estimatedInputRowCount uint64 } # Max1Row permits at most one row from the given input node, causing an error diff --git a/pkg/sql/opt_exec_factory.go b/pkg/sql/opt_exec_factory.go index 4debe8b2d988..7f06c2ef273a 100644 --- a/pkg/sql/opt_exec_factory.go +++ b/pkg/sql/opt_exec_factory.go @@ -1074,15 +1074,20 @@ func (ef *execFactory) ConstructLimit( }, nil } -// ConstructTopK is part of the execFactory interface. +// ConstructTopK is part of the exec.Factory interface. func (ef *execFactory) ConstructTopK( - input exec.Node, k int64, ordering exec.OutputOrdering, alreadyOrderedPrefix int, + input exec.Node, + k int64, + ordering exec.OutputOrdering, + alreadyOrderedPrefix int, + estimatedInputRowCount uint64, ) (exec.Node, error) { return &topKNode{ - plan: input.(planNode), - k: k, - ordering: colinfo.ColumnOrdering(ordering), - alreadyOrderedPrefix: alreadyOrderedPrefix, + plan: input.(planNode), + k: k, + ordering: colinfo.ColumnOrdering(ordering), + alreadyOrderedPrefix: alreadyOrderedPrefix, + estimatedInputRowCount: estimatedInputRowCount, }, nil } diff --git a/pkg/sql/topk.go b/pkg/sql/topk.go index e48657420e7a..d3d6c35be6ac 100644 --- a/pkg/sql/topk.go +++ b/pkg/sql/topk.go @@ -21,6 +21,9 @@ type topKNode struct { // When alreadyOrderedPrefix is non-zero, the input is already ordered on // the prefix ordering[:alreadyOrderedPrefix]. alreadyOrderedPrefix int + // estimatedInputRowCount, when set, is the estimated number of rows that + // this topKNode will read from its input. + estimatedInputRowCount uint64 } func (n *topKNode) startExec(params runParams) error { From f6ff2fe31eb57fa6ea4776d8665384df2b985798 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Mon, 9 Dec 2024 23:01:54 -0800 Subject: [PATCH 6/6] sql: adjust physical planning heuristic for scans This commit adjusts the physical planning heuristics for scans to be closer to what previous commits did for aggregations and sorts. Previously, we had a simple heuristic of always distributing full table scans and never forcing the distribution of constrained scans. This can be suboptimal from two angles: - if we have a small table, then it might not be beneficial to distribute the full scan - if we have a large constrained scan, the it might be beneficial to distribute it. This commit addresses both concerns by forcing the distribution of scans of "large" set of rows (where "large" is controlled via `distribute_scan_row_count_threshold` session variable, 10k by default) and saying "can distribute" scans of small sets. If we don't have stats available, then full table scans are distributed and constrained scans are not (similar to the previous behavior). The summary of changes: - if we have a full table scan but estimate to scan less than 10k rows, we no longer force the plan distribution - newly added `always_distribute_full_scans` session variable allow us to get the previous behavior for full scans - if we have a constrained scan and estimate it to scan at least 10k rows, we now always force the plan distribution. Release note (sql change): DistSQL physical planning decisions under `distsql=auto` mode have been adjusted in the following manner: - full table scans, estimated to read less than 10k rows (where the number comes from new `distribute_scan_row_count_threshold` session variable), no longer forces the plan distribution - set newly added `always_distribute_full_scans` session variable to `true` to get the previous behavior of always distributing full scans - large constrained table scans, estimated to scan at least 10k rows (controlled via the same `distribute_scan_row_count_threshold`), now always forces the plan distribution. --- pkg/sql/distsql_physical_planner.go | 27 +++--- pkg/sql/distsql_spec_exec_factory.go | 2 + pkg/sql/exec_util.go | 8 ++ .../testdata/logic_test/information_schema | 2 + .../logictest/testdata/logic_test/pg_catalog | 6 ++ .../logictest/testdata/logic_test/show_source | 2 + .../execbuilder/testdata/distsql_auto_mode | 86 +++++++++++++++---- .../local_only_session_data.proto | 7 ++ pkg/sql/vars.go | 40 +++++++++ 9 files changed, 149 insertions(+), 31 deletions(-) diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index f98fcb232059..0d9d759165cf 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -673,22 +673,23 @@ func checkSupportForPlanNode( return cannotDistribute, cannotDistributeRowLevelLockingErr } - switch { - case n.localityOptimized: + if n.localityOptimized { // This is a locality optimized scan. return cannotDistribute, nil - case n.isFull: - // This is a full scan. - return shouldDistribute, nil - default: - // Although we don't yet recommend distributing plans where soft limits - // propagate to scan nodes because we don't have infrastructure to only - // plan for a few ranges at a time, the propagation of the soft limits - // to scan nodes has been added in 20.1 release, so to keep the - // previous behavior we continue to ignore the soft limits for now. - // TODO(yuzefovich): pay attention to the soft limits. - return canDistribute, nil } + // TODO(yuzefovich): consider using the soft limit in making a decision + // here. + scanRec := canDistribute + if n.estimatedRowCount != 0 && n.estimatedRowCount >= sd.DistributeScanRowCountThreshold { + // This is a large scan, so we choose to distribute it. + scanRec = shouldDistribute + } + if n.isFull && (n.estimatedRowCount == 0 || sd.AlwaysDistributeFullScans) { + // In the absence of table stats, we default to always distributing + // full scans. + scanRec = shouldDistribute + } + return scanRec, nil case *sortNode: rec, err := checkSupportForPlanNode(n.plan, distSQLVisitor, sd) diff --git a/pkg/sql/distsql_spec_exec_factory.go b/pkg/sql/distsql_spec_exec_factory.go index db97c0c34d96..d658ffee7c44 100644 --- a/pkg/sql/distsql_spec_exec_factory.go +++ b/pkg/sql/distsql_spec_exec_factory.go @@ -236,6 +236,8 @@ func (e *distSQLSpecExecFactory) ConstructScan( } // Check if we are doing a full scan. + // TODO(yuzefovich): add better heuristics here so that we always distribute + // "large" scans, as controlled by a session variable. if isFullTableOrIndexScan { recommendation = recommendation.compose(shouldDistribute) } diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 9955ed588d11..7ac2a206aa57 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -3320,6 +3320,14 @@ func (m *sessionDataMutator) SetDistributeSortRowCountThreshold(val uint64) { m.data.DistributeSortRowCountThreshold = val } +func (m *sessionDataMutator) SetDistributeScanRowCountThreshold(val uint64) { + m.data.DistributeScanRowCountThreshold = val +} + +func (m *sessionDataMutator) SetAlwaysDistributeFullScans(val bool) { + m.data.AlwaysDistributeFullScans = val +} + func (m *sessionDataMutator) SetDisableVecUnionEagerCancellation(val bool) { m.data.DisableVecUnionEagerCancellation = val } diff --git a/pkg/sql/logictest/testdata/logic_test/information_schema b/pkg/sql/logictest/testdata/logic_test/information_schema index 934a6c78de1f..dd5ffd503638 100644 --- a/pkg/sql/logictest/testdata/logic_test/information_schema +++ b/pkg/sql/logictest/testdata/logic_test/information_schema @@ -3891,6 +3891,7 @@ variable value allow_ordinal_column_references off allow_role_memberships_to_change_during_transaction off alter_primary_region_super_region_override off +always_distribute_full_scans off application_name · authentication_method cert-password avoid_buffering off @@ -3928,6 +3929,7 @@ disable_plan_gists off disable_vec_union_eager_cancellation off disallow_full_table_scans off distribute_group_by_row_count_threshold 1000 +distribute_scan_row_count_threshold 10000 distribute_sort_row_count_threshold 1000 distsql_plan_gateway_bias 2 enable_auto_rehoming off diff --git a/pkg/sql/logictest/testdata/logic_test/pg_catalog b/pkg/sql/logictest/testdata/logic_test/pg_catalog index 7979b751a46d..d1d511ab366f 100644 --- a/pkg/sql/logictest/testdata/logic_test/pg_catalog +++ b/pkg/sql/logictest/testdata/logic_test/pg_catalog @@ -2897,6 +2897,7 @@ name setting c allow_ordinal_column_references off NULL NULL NULL string allow_role_memberships_to_change_during_transaction off NULL NULL NULL string alter_primary_region_super_region_override off NULL NULL NULL string +always_distribute_full_scans off NULL NULL NULL string application_name · NULL NULL NULL string authentication_method cert-password NULL NULL NULL string autocommit_before_ddl off NULL NULL NULL string @@ -2936,6 +2937,7 @@ disable_plan_gists off N disable_vec_union_eager_cancellation off NULL NULL NULL string disallow_full_table_scans off NULL NULL NULL string distribute_group_by_row_count_threshold 1000 NULL NULL NULL string +distribute_scan_row_count_threshold 10000 NULL NULL NULL string distribute_sort_row_count_threshold 1000 NULL NULL NULL string distsql off NULL NULL NULL string distsql_plan_gateway_bias 2 NULL NULL NULL string @@ -3096,6 +3098,7 @@ name setting u allow_ordinal_column_references off NULL user NULL off off allow_role_memberships_to_change_during_transaction off NULL user NULL off off alter_primary_region_super_region_override off NULL user NULL off off +always_distribute_full_scans off NULL user NULL off off application_name · NULL user NULL · · authentication_method cert-password NULL user NULL cert-password cert-password autocommit_before_ddl off NULL user NULL off off @@ -3135,6 +3138,7 @@ disable_plan_gists off N disable_vec_union_eager_cancellation off NULL user NULL off off disallow_full_table_scans off NULL user NULL off off distribute_group_by_row_count_threshold 1000 NULL user NULL 1000 1000 +distribute_scan_row_count_threshold 10000 NULL user NULL 10000 10000 distribute_sort_row_count_threshold 1000 NULL user NULL 1000 1000 distsql off NULL user NULL off off distsql_plan_gateway_bias 2 NULL user NULL 2 2 @@ -3288,6 +3292,7 @@ name source min_val max_ allow_ordinal_column_references NULL NULL NULL NULL NULL allow_role_memberships_to_change_during_transaction NULL NULL NULL NULL NULL alter_primary_region_super_region_override NULL NULL NULL NULL NULL +always_distribute_full_scans NULL NULL NULL NULL NULL application_name NULL NULL NULL NULL NULL authentication_method NULL NULL NULL NULL NULL autocommit_before_ddl NULL NULL NULL NULL NULL @@ -3330,6 +3335,7 @@ disable_plan_gists NULL NULL NULL disable_vec_union_eager_cancellation NULL NULL NULL NULL NULL disallow_full_table_scans NULL NULL NULL NULL NULL distribute_group_by_row_count_threshold NULL NULL NULL NULL NULL +distribute_scan_row_count_threshold NULL NULL NULL NULL NULL distribute_sort_row_count_threshold NULL NULL NULL NULL NULL distsql NULL NULL NULL NULL NULL distsql_plan_gateway_bias NULL NULL NULL NULL NULL diff --git a/pkg/sql/logictest/testdata/logic_test/show_source b/pkg/sql/logictest/testdata/logic_test/show_source index 75c01a0e0352..e5603c71bdee 100644 --- a/pkg/sql/logictest/testdata/logic_test/show_source +++ b/pkg/sql/logictest/testdata/logic_test/show_source @@ -30,6 +30,7 @@ variable value allow_ordinal_column_references off allow_role_memberships_to_change_during_transaction off alter_primary_region_super_region_override off +always_distribute_full_scans off application_name · authentication_method cert-password autocommit_before_ddl off @@ -69,6 +70,7 @@ disable_plan_gists off disable_vec_union_eager_cancellation off disallow_full_table_scans off distribute_group_by_row_count_threshold 1000 +distribute_scan_row_count_threshold 10000 distribute_sort_row_count_threshold 1000 distsql off distsql_plan_gateway_bias 2 diff --git a/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode b/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode index feb1af9df006..262b19ce8eb6 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode +++ b/pkg/sql/opt/exec/execbuilder/testdata/distsql_auto_mode @@ -15,6 +15,18 @@ CREATE TABLE kv (k INT PRIMARY KEY, v INT); ALTER TABLE kv SPLIT AT SELECT i FROM generate_series(1,5) AS g(i); ALTER TABLE kv EXPERIMENTAL_RELOCATE SELECT ARRAY[i], i FROM generate_series(1, 5) as g(i); +# Full table scan (no stats) - distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv] WHERE info LIKE 'distribution%' +---- +distribution: full + +# Constrained scan (no stats) - don't distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1] WHERE info LIKE 'distribution%' +---- +distribution: local + statement ok ALTER TABLE kv INJECT STATISTICS '[ { @@ -31,25 +43,76 @@ EXPLAIN (DISTSQL, JSON) SELECT 1 ---- {"sql":"EXPLAIN (DISTSQL, JSON) SELECT 1","nodeNames":["1"],"processors":[{"nodeIdx":0,"inputs":[],"core":{"title":"local values 0/0","details":[]},"outputs":[],"stage":1,"processorID":0},{"nodeIdx":0,"inputs":[],"core":{"title":"Response","details":[]},"outputs":[],"stage":0,"processorID":-1}],"edges":[{"sourceProc":0,"sourceOutput":0,"destProc":1,"destInput":0,"streamID":0}],"flow_id":"00000000-0000-0000-0000-000000000000","flags":{"ShowInputTypes":false,"MakeDeterministic":true}} -# Full table scan - distribute. +# Full table scan of a small table - don't distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv] WHERE info LIKE 'distribution%' +---- +distribution: local + +# Force full table scan of a small table to be distributed. +statement ok +SET always_distribute_full_scans = true; + query T SELECT info FROM [EXPLAIN SELECT * FROM kv] WHERE info LIKE 'distribution%' ---- distribution: full -# Partial scan - don't distribute. +statement ok +RESET always_distribute_full_scans; + +# Small constrained scan - don't distribute. query T -SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k=1] WHERE info LIKE 'distribution%' +SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1] WHERE info LIKE 'distribution%' ---- distribution: local -# Partial scan - don't distribute. +# Consider the following scans large. +statement ok +SET distribute_scan_row_count_threshold = 1; + +# Full scan of a large table - distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv] WHERE info LIKE 'distribution%' +---- +distribution: full + +# Large constrained scan - distribute. query T SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1] WHERE info LIKE 'distribution%' ---- +distribution: full + +# Large constrained scan with filter - distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 AND v=1] WHERE info LIKE 'distribution%' +---- +distribution: full + +# Hard limit in a large scan - don't distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv LIMIT 1] WHERE info LIKE 'distribution%' +---- distribution: local -# Partial scan with filter - don't distribute. +# TODO(yuzefovich): we shouldn't distribute this query due to a soft limit, but +# soft limits are currently ignored. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv UNION SELECT * FROM kv LIMIT 1] WHERE info LIKE 'distribution%' +---- +distribution: full + +# Now consider all constrained scans through the end of the file small. +statement ok +SET distribute_scan_row_count_threshold = 100000; + +# Small constrained scan - don't distribute. +query T +SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1] WHERE info LIKE 'distribution%' +---- +distribution: local + +# Small constrained scan with filter - don't distribute. query T SELECT info FROM [EXPLAIN SELECT * FROM kv WHERE k>1 AND v=1] WHERE info LIKE 'distribution%' ---- @@ -117,19 +180,6 @@ distribution: local statement ok RESET distribute_group_by_row_count_threshold; -# Hard limit in scan - don't distribute. -query T -SELECT info FROM [EXPLAIN SELECT * FROM kv LIMIT 1] WHERE info LIKE 'distribution%' ----- -distribution: local - -# Soft limit in scan - don't distribute. -# TODO(yuzefovich): soft limits are currently ignored in scans. -query T -SELECT info FROM [EXPLAIN SELECT * FROM kv UNION SELECT * FROM kv LIMIT 1] WHERE info LIKE 'distribution%' ----- -distribution: full - # Limit after aggregation - distribute. query T SELECT info FROM [EXPLAIN SELECT k, sum(v) FROM kv WHERE k>1 GROUP BY k LIMIT 1] WHERE info LIKE 'distribution%' diff --git a/pkg/sql/sessiondatapb/local_only_session_data.proto b/pkg/sql/sessiondatapb/local_only_session_data.proto index 250b56350bd3..25be303a9b69 100644 --- a/pkg/sql/sessiondatapb/local_only_session_data.proto +++ b/pkg/sql/sessiondatapb/local_only_session_data.proto @@ -571,6 +571,13 @@ message LocalOnlySessionData { // to be processed by the Sort operator so that we choose to distribute the // plan because of this sorter stage of DistSQL processors. uint64 distribute_sort_row_count_threshold = 146; + // DistributeScanRowCountThreshold is the minimum number of rows estimated to + // be read by the Scan operator so that we choose to distribute the plan + // because of this TableReader stage of DistSQL processors. + uint64 distribute_scan_row_count_threshold = 147; + // AlwaysDistributeFullScans determines whether full table scans always force + // the plan to be distributed, regardless of the estimated row count. + bool always_distribute_full_scans = 148; /////////////////////////////////////////////////////////////////////////// // WARNING: consider whether a session parameter you're adding needs to // diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index d20b33725412..b8f1378272d2 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -678,6 +678,46 @@ var varGen = map[string]sessionVar{ }, }, + // CockroachDB extension. + `distribute_scan_row_count_threshold`: { + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return strconv.FormatUint(evalCtx.SessionData().DistributeScanRowCountThreshold, 10), nil + }, + GetStringVal: makeIntGetStringValFn(`distribute_scan_row_count_threshold`), + Set: func(_ context.Context, m sessionDataMutator, s string) error { + i, err := strconv.ParseInt(s, 10, 64) + if err != nil { + return err + } + if i < 0 { + return pgerror.Newf(pgcode.InvalidParameterValue, + "cannot set distribute_scan_row_count_threshold to a negative value: %d", i) + } + m.SetDistributeScanRowCountThreshold(uint64(i)) + return nil + }, + GlobalDefault: func(sv *settings.Values) string { + return strconv.FormatUint(10000, 10) + }, + }, + + // CockroachDB extension. + `always_distribute_full_scans`: { + GetStringVal: makePostgresBoolGetStringValFn(`always_distribute_full_scans`), + Set: func(_ context.Context, m sessionDataMutator, s string) error { + b, err := paramparse.ParseBoolVar("always_distribute_full_scans", s) + if err != nil { + return err + } + m.SetAlwaysDistributeFullScans(b) + return nil + }, + Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { + return formatBoolAsPostgresSetting(evalCtx.SessionData().AlwaysDistributeFullScans), nil + }, + GlobalDefault: globalFalse, + }, + // CockroachDB extension. `disable_vec_union_eager_cancellation`: { GetStringVal: makePostgresBoolGetStringValFn(`disable_vec_union_eager_cancellation`),