From e00f324f764bf34edcceb36f4a37284b8b575208 Mon Sep 17 00:00:00 2001 From: Yahor Yuzefovich Date: Fri, 17 Mar 2023 18:16:42 -0700 Subject: [PATCH 01/12] sql: address a few nits from reviewing an old PR This commit addresses several nits (a typo, missing periods, and precisely allocating a slice) that I noticed while reviewing the old PR which introduced DELETE FROM ... USING support. Epic: None Release note: None --- pkg/sql/opt/exec/execbuilder/mutation.go | 3 ++- pkg/sql/opt/exec/factory.opt | 2 +- pkg/sql/opt/optbuilder/mutation_builder.go | 4 ++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/sql/opt/exec/execbuilder/mutation.go b/pkg/sql/opt/exec/execbuilder/mutation.go index 788e35d46c80..2ea1b6af3780 100644 --- a/pkg/sql/opt/exec/execbuilder/mutation.go +++ b/pkg/sql/opt/exec/execbuilder/mutation.go @@ -511,9 +511,10 @@ func (b *Builder) buildDelete(del *memo.DeleteExpr) (execPlan, error) { fetchColOrds := ordinalSetFromColList(del.FetchCols) returnColOrds := ordinalSetFromColList(del.ReturnCols) - //Construct the result columns for the passthrough set + // Construct the result columns for the passthrough set. var passthroughCols colinfo.ResultColumns if del.NeedResults() { + passthroughCols = make(colinfo.ResultColumns, 0, len(del.PassthroughCols)) for _, passthroughCol := range del.PassthroughCols { colMeta := b.mem.Metadata().ColumnMeta(passthroughCol) passthroughCols = append(passthroughCols, colinfo.ResultColumn{Name: colMeta.Alias, Typ: colMeta.Type}) diff --git a/pkg/sql/opt/exec/factory.opt b/pkg/sql/opt/exec/factory.opt index 4e7715d21d0c..9a509d0e87d7 100644 --- a/pkg/sql/opt/exec/factory.opt +++ b/pkg/sql/opt/exec/factory.opt @@ -566,7 +566,7 @@ define Upsert { # as they appear in the table schema. # # The passthrough parameter contains all the result columns that are part of -# the input node that the update node needs to return (passing through from +# the input node that the delete node needs to return (passing through from # the input). The pass through columns are used to return any column from the # USING tables that are referenced in the RETURNING clause. define Delete { diff --git a/pkg/sql/opt/optbuilder/mutation_builder.go b/pkg/sql/opt/optbuilder/mutation_builder.go index 101164006ae2..bbcda324e8db 100644 --- a/pkg/sql/opt/optbuilder/mutation_builder.go +++ b/pkg/sql/opt/optbuilder/mutation_builder.go @@ -417,7 +417,7 @@ func (mb *mutationBuilder) buildInputForDelete( if usingClausePresent { usingScope := mb.b.buildFromTables(using, noRowLocking, inScope) - // Check that the same table name is not used multiple times + // Check that the same table name is not used multiple times. mb.b.validateJoinTableNames(mb.fetchScope, usingScope) // The USING table columns can be accessed by the RETURNING clause of the @@ -460,7 +460,7 @@ func (mb *mutationBuilder) buildInputForDelete( mb.outScope = projectionsScope // Build a distinct on to ensure there is at most one row in the joined output - // for every row in the table + // for every row in the table. if usingClausePresent { var pkCols opt.ColSet From 69b94315ee4a6ea8305ed425afd1fe3c7b2d5003 Mon Sep 17 00:00:00 2001 From: Ricky Stewart Date: Fri, 31 Mar 2023 10:58:47 -0500 Subject: [PATCH 02/12] bazel: upgrade to bazel 5.4.0 Release note: None Epic: none --- .bazelversion | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.bazelversion b/.bazelversion index a541fd8b0f51..6852e0d130e3 100644 --- a/.bazelversion +++ b/.bazelversion @@ -1 +1 @@ -cockroachdb/5.1.0 +cockroachdb/5.4.0 From 04f7b0e429ba24682a2e109d3f7cbfa20cb3b503 Mon Sep 17 00:00:00 2001 From: Marcus Gartner Date: Wed, 29 Mar 2023 17:21:33 -0400 Subject: [PATCH 03/12] opt: remove panics in execbuilder This commit replaces `panic`s in execbuilder with returned errors. This is safer because execbuilder functions are invoked outside the panic-recoverable `Optimizer.Optimize` function. Release note: None --- pkg/sql/opt/exec/execbuilder/builder.go | 8 +- pkg/sql/opt/exec/execbuilder/mutation.go | 53 ++-- pkg/sql/opt/exec/execbuilder/relational.go | 350 +++++++++++++++------ pkg/sql/opt/exec/execbuilder/scalar.go | 34 +- pkg/sql/opt/exec/execbuilder/statement.go | 11 +- 5 files changed, 331 insertions(+), 125 deletions(-) diff --git a/pkg/sql/opt/exec/execbuilder/builder.go b/pkg/sql/opt/exec/execbuilder/builder.go index 6fa9aab95667..48b52702ea3b 100644 --- a/pkg/sql/opt/exec/execbuilder/builder.go +++ b/pkg/sql/opt/exec/execbuilder/builder.go @@ -263,17 +263,17 @@ func (b *Builder) Build() (_ exec.Plan, err error) { return b.factory.ConstructPlan(plan.root, b.subqueries, b.cascades, b.checks, rootRowCount) } -func (b *Builder) wrapFunction(fnName string) tree.ResolvableFunctionReference { +func (b *Builder) wrapFunction(fnName string) (tree.ResolvableFunctionReference, error) { if b.evalCtx != nil && b.catalog != nil { // Some tests leave those unset. unresolved := tree.MakeUnresolvedName(fnName) fnDef, err := b.catalog.ResolveFunction( context.Background(), &unresolved, &b.evalCtx.SessionData().SearchPath) if err != nil { - panic(err) + return tree.ResolvableFunctionReference{}, err } - return tree.ResolvableFunctionReference{FunctionReference: fnDef} + return tree.ResolvableFunctionReference{FunctionReference: fnDef}, nil } - return tree.WrapFunction(fnName) + return tree.WrapFunction(fnName), nil } func (b *Builder) build(e opt.Expr) (_ execPlan, err error) { diff --git a/pkg/sql/opt/exec/execbuilder/mutation.go b/pkg/sql/opt/exec/execbuilder/mutation.go index 788e35d46c80..3ff92dede96e 100644 --- a/pkg/sql/opt/exec/execbuilder/mutation.go +++ b/pkg/sql/opt/exec/execbuilder/mutation.go @@ -32,7 +32,11 @@ import ( func (b *Builder) buildMutationInput( mutExpr, inputExpr memo.RelExpr, colList opt.ColList, p *memo.MutationPrivate, ) (execPlan, error) { - if b.shouldApplyImplicitLockingToMutationInput(mutExpr) { + shouldApplyImplicitLocking, err := b.shouldApplyImplicitLockingToMutationInput(mutExpr) + if err != nil { + return execPlan{}, err + } + if shouldApplyImplicitLocking { // Re-entrance is not possible because mutations are never nested. b.forceForUpdateLocking = true defer func() { b.forceForUpdateLocking = false }() @@ -198,19 +202,19 @@ func (b *Builder) tryBuildFastPathInsert(ins *memo.InsertExpr) (_ execPlan, ok b out := &fkChecks[i] out.InsertCols = make([]exec.TableColumnOrdinal, len(lookupJoin.KeyCols)) - findCol := func(cols opt.OptionalColList, col opt.ColumnID) int { - res, ok := cols.Find(col) - if !ok { - panic(errors.AssertionFailedf("cannot find column %d", col)) - } - return res - } for i, keyCol := range lookupJoin.KeyCols { // The keyCol comes from the WithScan operator. We must find the matching // column in the mutation input. - withColOrd := findCol(opt.OptionalColList(withScan.OutCols), keyCol) + withColOrd, ok := withScan.OutCols.Find(keyCol) + if !ok { + return execPlan{}, false, errors.AssertionFailedf("cannot find column %d", keyCol) + } inputCol := withScan.InCols[withColOrd] - out.InsertCols[i] = exec.TableColumnOrdinal(findCol(ins.InsertCols, inputCol)) + inputColOrd, ok := ins.InsertCols.Find(inputCol) + if !ok { + return execPlan{}, false, errors.AssertionFailedf("cannot find column %d", inputCol) + } + out.InsertCols[i] = exec.TableColumnOrdinal(inputColOrd) } out.ReferencedTable = md.Table(lookupJoin.Table) @@ -435,7 +439,10 @@ func (b *Builder) buildUpsert(ups *memo.UpsertExpr) (execPlan, error) { tab := md.Table(ups.Table) canaryCol := exec.NodeColumnOrdinal(-1) if ups.CanaryCol != 0 { - canaryCol = input.getNodeColumnOrdinal(ups.CanaryCol) + canaryCol, err = input.getNodeColumnOrdinal(ups.CanaryCol) + if err != nil { + return execPlan{}, err + } } insertColOrds := ordinalSetFromColList(ups.InsertCols) fetchColOrds := ordinalSetFromColList(ups.FetchCols) @@ -712,7 +719,11 @@ func (b *Builder) buildUniqueChecks(checks memo.UniqueChecksExpr) error { mkErr := func(row tree.Datums) error { keyVals := make(tree.Datums, len(c.KeyCols)) for i, col := range c.KeyCols { - keyVals[i] = row[query.getNodeColumnOrdinal(col)] + ord, err := query.getNodeColumnOrdinal(col) + if err != nil { + return err + } + keyVals[i] = row[ord] } return mkUniqueCheckErr(md, c, keyVals) } @@ -738,7 +749,11 @@ func (b *Builder) buildFKChecks(checks memo.FKChecksExpr) error { mkErr := func(row tree.Datums) error { keyVals := make(tree.Datums, len(c.KeyCols)) for i, col := range c.KeyCols { - keyVals[i] = row[query.getNodeColumnOrdinal(col)] + ord, err := query.getNodeColumnOrdinal(col) + if err != nil { + return err + } + keyVals[i] = row[ord] } return mkFKCheckErr(md, c, keyVals) } @@ -961,26 +976,26 @@ var forUpdateLocking = opt.Locking{Strength: tree.ForUpdate} // shouldApplyImplicitLockingToMutationInput determines whether or not the // builder should apply a FOR UPDATE row-level locking mode to the initial row // scan of a mutation expression. -func (b *Builder) shouldApplyImplicitLockingToMutationInput(mutExpr memo.RelExpr) bool { +func (b *Builder) shouldApplyImplicitLockingToMutationInput(mutExpr memo.RelExpr) (bool, error) { switch t := mutExpr.(type) { case *memo.InsertExpr: // Unlike with the other three mutation expressions, it never makes // sense to apply implicit row-level locking to the input of an INSERT // expression because any contention results in unique constraint // violations. - return false + return false, nil case *memo.UpdateExpr: - return b.shouldApplyImplicitLockingToUpdateInput(t) + return b.shouldApplyImplicitLockingToUpdateInput(t), nil case *memo.UpsertExpr: - return b.shouldApplyImplicitLockingToUpsertInput(t) + return b.shouldApplyImplicitLockingToUpsertInput(t), nil case *memo.DeleteExpr: - return b.shouldApplyImplicitLockingToDeleteInput(t) + return b.shouldApplyImplicitLockingToDeleteInput(t), nil default: - panic(errors.AssertionFailedf("unexpected mutation expression %T", t)) + return false, errors.AssertionFailedf("unexpected mutation expression %T", t) } } diff --git a/pkg/sql/opt/exec/execbuilder/relational.go b/pkg/sql/opt/exec/execbuilder/relational.go index e7ca485b2a59..36d5fe370749 100644 --- a/pkg/sql/opt/exec/execbuilder/relational.go +++ b/pkg/sql/opt/exec/execbuilder/relational.go @@ -111,37 +111,46 @@ func (ep *execPlan) makeBuildScalarCtx() buildScalarCtx { // getNodeColumnOrdinal takes a column that is known to be produced by the execPlan // and returns the ordinal index of that column in the result columns of the // node. -func (ep *execPlan) getNodeColumnOrdinal(col opt.ColumnID) exec.NodeColumnOrdinal { +func (ep *execPlan) getNodeColumnOrdinal(col opt.ColumnID) (exec.NodeColumnOrdinal, error) { ord, ok := ep.outputCols.Get(int(col)) if !ok { - panic(errors.AssertionFailedf("column %d not in input", redact.Safe(col))) + return 0, errors.AssertionFailedf("column %d not in input", redact.Safe(col)) } - return exec.NodeColumnOrdinal(ord) + return exec.NodeColumnOrdinal(ord), nil } -func (ep *execPlan) getNodeColumnOrdinalSet(cols opt.ColSet) exec.NodeColumnOrdinalSet { +func (ep *execPlan) getNodeColumnOrdinalSet(cols opt.ColSet) (exec.NodeColumnOrdinalSet, error) { var res exec.NodeColumnOrdinalSet - cols.ForEach(func(colID opt.ColumnID) { - res.Add(int(ep.getNodeColumnOrdinal(colID))) - }) - return res + for colID, ok := cols.Next(0); ok; colID, ok = cols.Next(colID + 1) { + colOrd, err := ep.getNodeColumnOrdinal(colID) + if err != nil { + return exec.NodeColumnOrdinalSet{}, err + } + res.Add(int(colOrd)) + } + return res, nil } // reqOrdering converts the provided ordering of a relational expression to an // OutputOrdering (according to the outputCols map). -func (ep *execPlan) reqOrdering(expr memo.RelExpr) exec.OutputOrdering { - return exec.OutputOrdering(ep.sqlOrdering(expr.ProvidedPhysical().Ordering)) +func (ep *execPlan) reqOrdering(expr memo.RelExpr) (exec.OutputOrdering, error) { + ordering, err := ep.sqlOrdering(expr.ProvidedPhysical().Ordering) + return exec.OutputOrdering(ordering), err } // sqlOrdering converts an Ordering to a ColumnOrdering (according to the // outputCols map). -func (ep *execPlan) sqlOrdering(ordering opt.Ordering) colinfo.ColumnOrdering { +func (ep *execPlan) sqlOrdering(ordering opt.Ordering) (colinfo.ColumnOrdering, error) { if ordering.Empty() { - return nil + return nil, nil } colOrder := make(colinfo.ColumnOrdering, len(ordering)) for i := range ordering { - colOrder[i].ColIdx = int(ep.getNodeColumnOrdinal(ordering[i].ID())) + ord, err := ep.getNodeColumnOrdinal(ordering[i].ID()) + if err != nil { + return nil, err + } + colOrder[i].ColIdx = int(ord) if ordering[i].Descending() { colOrder[i].Direction = encoding.Descending } else { @@ -149,7 +158,7 @@ func (ep *execPlan) sqlOrdering(ordering opt.Ordering) colinfo.ColumnOrdering { } } - return colOrder + return colOrder, nil } func (b *Builder) buildRelational(e memo.RelExpr) (execPlan, error) { @@ -812,11 +821,15 @@ func (b *Builder) buildScan(scan *memo.ScanExpr) (execPlan, error) { return execPlan{}, err } res := execPlan{outputCols: outputCols} + reqOrdering, err := res.reqOrdering(scan) + if err != nil { + return execPlan{}, err + } root, err := b.factory.ConstructScan( tab, tab.Index(scan.Index), params, - res.reqOrdering(scan), + reqOrdering, ) if err != nil { return execPlan{}, err @@ -886,11 +899,15 @@ func (b *Builder) buildPlaceholderScan(scan *memo.PlaceholderScanExpr) (execPlan return execPlan{}, err } res := execPlan{outputCols: outputCols} + reqOrdering, err := res.reqOrdering(scan) + if err != nil { + return execPlan{}, err + } root, err := b.factory.ConstructScan( tab, tab.Index(scan.Index), params, - res.reqOrdering(scan), + reqOrdering, ) if err != nil { return execPlan{}, err @@ -911,7 +928,10 @@ func (b *Builder) buildSelect(sel *memo.SelectExpr) (execPlan, error) { } // A filtering node does not modify the schema. res := execPlan{outputCols: input.outputCols} - reqOrder := res.reqOrdering(sel) + reqOrder, err := res.reqOrdering(sel) + if err != nil { + return execPlan{}, err + } res.root, err = b.factory.ConstructFilter(input.root, filter, reqOrder) if err != nil { return execPlan{}, err @@ -926,7 +946,10 @@ func (b *Builder) buildInvertedFilter(invFilter *memo.InvertedFilterExpr) (execP } // A filtering node does not modify the schema. res := execPlan{outputCols: input.outputCols} - invertedCol := input.getNodeColumnOrdinal(invFilter.InvertedColumn) + invertedCol, err := input.getNodeColumnOrdinal(invFilter.InvertedColumn) + if err != nil { + return execPlan{}, err + } var typedPreFilterExpr tree.TypedExpr var typ *types.T if invFilter.PreFiltererState != nil && invFilter.PreFiltererState.Expr != nil { @@ -968,13 +991,21 @@ func (b *Builder) applySimpleProject( // We have only pass-through columns. colList := make([]exec.NodeColumnOrdinal, 0, cols.Len()) var res execPlan - cols.ForEach(func(i opt.ColumnID) { + for i, ok := cols.Next(0); ok; i, ok = cols.Next(i + 1) { res.outputCols.Set(int(i), len(colList)) - colList = append(colList, input.getNodeColumnOrdinal(i)) - }) + ord, err := input.getNodeColumnOrdinal(i) + if err != nil { + return execPlan{}, err + } + colList = append(colList, ord) + } var err error + sqlOrdering, err := res.sqlOrdering(providedOrd) + if err != nil { + return execPlan{}, err + } res.root, err = b.factory.ConstructSimpleProject( - input.root, colList, exec.OutputOrdering(res.sqlOrdering(providedOrd)), + input.root, colList, exec.OutputOrdering(sqlOrdering), ) if err != nil { return execPlan{}, err @@ -1012,17 +1043,23 @@ func (b *Builder) buildProject(prj *memo.ProjectExpr) (execPlan, error) { Typ: item.Typ, }) } - prj.Passthrough.ForEach(func(colID opt.ColumnID) { + for colID, ok := prj.Passthrough.Next(0); ok; colID, ok = prj.Passthrough.Next(colID + 1) { res.outputCols.Set(int(colID), len(exprs)) - indexedVar := b.indexedVar(&ctx, md, colID) + indexedVar, err := b.indexedVar(&ctx, md, colID) + if err != nil { + return execPlan{}, err + } exprs = append(exprs, indexedVar) meta := md.ColumnMeta(colID) cols = append(cols, colinfo.ResultColumn{ Name: meta.Alias, Typ: meta.Type, }) - }) - reqOrdering := res.reqOrdering(prj) + } + reqOrdering, err := res.reqOrdering(prj) + if err != nil { + return execPlan{}, err + } res.root, err = b.factory.ConstructRender(input.root, cols, exprs, reqOrdering) if err != nil { return execPlan{}, err @@ -1036,7 +1073,10 @@ func (b *Builder) buildApplyJoin(join memo.RelExpr) (execPlan, error) { default: return execPlan{}, fmt.Errorf("couldn't execute correlated subquery with op %s", join.Op()) } - joinType := joinOpToJoinType(join.Op()) + joinType, err := joinOpToJoinType(join.Op()) + if err != nil { + return execPlan{}, err + } leftExpr := join.Child(0).(memo.RelExpr) leftProps := leftExpr.Relational() rightExpr := join.Child(1).(memo.RelExpr) @@ -1254,7 +1294,10 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (execPlan, error) { ) } - joinType := joinOpToJoinType(join.Op()) + joinType, err := joinOpToJoinType(join.Op()) + if err != nil { + return execPlan{}, err + } leftExpr := join.Child(0).(memo.RelExpr) rightExpr := join.Child(1).(memo.RelExpr) filters := join.Child(2).(*memo.FiltersExpr) @@ -1307,8 +1350,14 @@ func (b *Builder) buildHashJoin(join memo.RelExpr) (execPlan, error) { leftEqOrdinals := eqColsBuf[:len(leftEq):len(leftEq)] rightEqOrdinals := eqColsBuf[len(leftEq):] for i := range leftEq { - leftEqOrdinals[i] = left.getNodeColumnOrdinal(leftEq[i]) - rightEqOrdinals[i] = right.getNodeColumnOrdinal(rightEq[i]) + leftEqOrdinals[i], err = left.getNodeColumnOrdinal(leftEq[i]) + if err != nil { + return execPlan{}, err + } + rightEqOrdinals[i], err = right.getNodeColumnOrdinal(rightEq[i]) + if err != nil { + return execPlan{}, err + } } leftEqColsAreKey := leftExpr.Relational().FuncDeps.ColsAreStrictKey(leftEq.ToSet()) @@ -1339,7 +1388,10 @@ func (b *Builder) buildMergeJoin(join *memo.MergeJoinExpr) (execPlan, error) { telemetry.Inc(opt.JoinTypeToUseCounter(join.JoinType)) } - joinType := joinOpToJoinType(join.JoinType) + joinType, err := joinOpToJoinType(join.JoinType) + if err != nil { + return execPlan{}, err + } leftExpr, rightExpr := join.Left, join.Right leftEq, rightEq := join.LeftEq, join.RightEq @@ -1368,10 +1420,19 @@ func (b *Builder) buildMergeJoin(join *memo.MergeJoinExpr) (execPlan, error) { if err != nil { return execPlan{}, err } - leftOrd := left.sqlOrdering(leftEq) - rightOrd := right.sqlOrdering(rightEq) + leftOrd, err := left.sqlOrdering(leftEq) + if err != nil { + return execPlan{}, err + } + rightOrd, err := right.sqlOrdering(rightEq) + if err != nil { + return execPlan{}, err + } ep := execPlan{outputCols: outputCols} - reqOrd := ep.reqOrdering(join) + reqOrd, err := ep.reqOrdering(join) + if err != nil { + return execPlan{}, err + } leftEqColsAreKey := leftExpr.Relational().FuncDeps.ColsAreStrictKey(leftEq.ColSet()) rightEqColsAreKey := rightExpr.Relational().FuncDeps.ColsAreStrictKey(rightEq.ColSet()) b.recordJoinType(joinType) @@ -1435,28 +1496,28 @@ func joinOutputMap(left, right opt.ColMap) opt.ColMap { return res } -func joinOpToJoinType(op opt.Operator) descpb.JoinType { +func joinOpToJoinType(op opt.Operator) (descpb.JoinType, error) { switch op { case opt.InnerJoinOp, opt.InnerJoinApplyOp: - return descpb.InnerJoin + return descpb.InnerJoin, nil case opt.LeftJoinOp, opt.LeftJoinApplyOp: - return descpb.LeftOuterJoin + return descpb.LeftOuterJoin, nil case opt.RightJoinOp: - return descpb.RightOuterJoin + return descpb.RightOuterJoin, nil case opt.FullJoinOp: - return descpb.FullOuterJoin + return descpb.FullOuterJoin, nil case opt.SemiJoinOp, opt.SemiJoinApplyOp: - return descpb.LeftSemiJoin + return descpb.LeftSemiJoin, nil case opt.AntiJoinOp, opt.AntiJoinApplyOp: - return descpb.LeftAntiJoin + return descpb.LeftAntiJoin, nil default: - panic(errors.AssertionFailedf("not a join op %s", redact.Safe(op))) + return 0, errors.AssertionFailedf("not a join op %s", redact.Safe(op)) } } @@ -1471,7 +1532,11 @@ func (b *Builder) buildGroupBy(groupBy memo.RelExpr) (execPlan, error) { groupingColIdx := make([]exec.NodeColumnOrdinal, 0, groupingCols.Len()) for i, ok := groupingCols.Next(0); ok; i, ok = groupingCols.Next(i + 1) { ep.outputCols.Set(int(i), len(groupingColIdx)) - groupingColIdx = append(groupingColIdx, input.getNodeColumnOrdinal(i)) + ord, err := input.getNodeColumnOrdinal(i) + if err != nil { + return execPlan{}, err + } + groupingColIdx = append(groupingColIdx, ord) } aggregations := *groupBy.Child(1).(*memo.AggregationsExpr) @@ -1486,7 +1551,10 @@ func (b *Builder) buildGroupBy(groupBy memo.RelExpr) (execPlan, error) { if !ok { return execPlan{}, errors.AssertionFailedf("only VariableOp args supported") } - filterOrd = input.getNodeColumnOrdinal(filter.Col) + filterOrd, err = input.getNodeColumnOrdinal(filter.Col) + if err != nil { + return execPlan{}, err + } agg = aggFilter.Input } @@ -1508,7 +1576,11 @@ func (b *Builder) buildGroupBy(groupBy memo.RelExpr) (execPlan, error) { if len(constArgs) != 0 { return execPlan{}, errors.Errorf("constant args must come after variable args") } - argCols = append(argCols, input.getNodeColumnOrdinal(variable.Col)) + ord, err := input.getNodeColumnOrdinal(variable.Col) + if err != nil { + return execPlan{}, err + } + argCols = append(argCols, ord) } else { if len(argCols) == 0 { return execPlan{}, errors.Errorf("a constant arg requires at least one variable arg") @@ -1532,10 +1604,18 @@ func (b *Builder) buildGroupBy(groupBy memo.RelExpr) (execPlan, error) { ep.root, err = b.factory.ConstructScalarGroupBy(input.root, aggInfos) } else { groupBy := groupBy.(*memo.GroupByExpr) - groupingColOrder := input.sqlOrdering(ordering.StreamingGroupingColOrdering( + var groupingColOrder colinfo.ColumnOrdering + groupingColOrder, err = input.sqlOrdering(ordering.StreamingGroupingColOrdering( &groupBy.GroupingPrivate, &groupBy.RequiredPhysical().Ordering, )) - reqOrdering := ep.reqOrdering(groupBy) + if err != nil { + return execPlan{}, err + } + var reqOrdering exec.OutputOrdering + reqOrdering, err = ep.reqOrdering(groupBy) + if err != nil { + return execPlan{}, err + } orderType := exec.GroupingOrderType(groupBy.GroupingOrderType(&groupBy.RequiredPhysical().Ordering)) ep.root, err = b.factory.ConstructGroupBy( input.root, groupingColIdx, groupingColOrder, aggInfos, reqOrdering, orderType, @@ -1560,17 +1640,27 @@ func (b *Builder) buildDistinct(distinct memo.RelExpr) (execPlan, error) { return execPlan{}, err } - distinctCols := input.getNodeColumnOrdinalSet(private.GroupingCols) + distinctCols, err := input.getNodeColumnOrdinalSet(private.GroupingCols) + if err != nil { + return execPlan{}, err + } var orderedCols exec.NodeColumnOrdinalSet ordering := ordering.StreamingGroupingColOrdering( private, &distinct.RequiredPhysical().Ordering, ) for i := range ordering { - orderedCols.Add(int(input.getNodeColumnOrdinal(ordering[i].ID()))) + ord, err := input.getNodeColumnOrdinal(ordering[i].ID()) + if err != nil { + return execPlan{}, err + } + orderedCols.Add(int(ord)) } ep := execPlan{outputCols: input.outputCols} - reqOrdering := ep.reqOrdering(distinct) + reqOrdering, err := ep.reqOrdering(distinct) + if err != nil { + return execPlan{}, err + } ep.root, err = b.factory.ConstructDistinct( input.root, distinctCols, orderedCols, reqOrdering, private.NullsAreDistinct, private.ErrorOnDup) @@ -1628,14 +1718,18 @@ func (b *Builder) buildGroupByInput(groupBy memo.RelExpr) (execPlan, error) { for colID, ok := neededCols.Next(0); ok; colID, ok = neededCols.Next(colID + 1) { ordinal, ordOk := input.outputCols.Get(int(colID)) if !ordOk { - panic(errors.AssertionFailedf("needed column not produced by group-by input")) + return execPlan{}, + errors.AssertionFailedf("needed column not produced by group-by input") } newOutputCols.Set(int(colID), len(cols)) cols = append(cols, exec.NodeColumnOrdinal(ordinal)) } input.outputCols = newOutputCols - reqOrdering := input.reqOrdering(groupByInput) + reqOrdering, err := input.reqOrdering(groupByInput) + if err != nil { + return execPlan{}, err + } input.root, err = b.factory.ConstructSimpleProject(input.root, cols, reqOrdering) if err != nil { return execPlan{}, err @@ -1701,7 +1795,7 @@ func (b *Builder) buildSetOp(set memo.RelExpr) (execPlan, error) { case opt.ExceptAllOp: typ, all = tree.ExceptOp, true default: - panic(errors.AssertionFailedf("invalid operator %s", redact.Safe(set.Op()))) + return execPlan{}, errors.AssertionFailedf("invalid operator %s", redact.Safe(set.Op())) } switch typ { @@ -1734,10 +1828,16 @@ func (b *Builder) buildSetOp(set memo.RelExpr) (execPlan, error) { for i, col := range private.OutCols { ep.outputCols.Set(int(col), i) } - streamingOrdering := ep.sqlOrdering( + streamingOrdering, err := ep.sqlOrdering( ordering.StreamingSetOpOrdering(set, &set.RequiredPhysical().Ordering), ) - reqOrdering := ep.reqOrdering(set) + if err != nil { + return execPlan{}, err + } + reqOrdering, err := ep.reqOrdering(set) + if err != nil { + return execPlan{}, err + } if typ == tree.UnionOp && all { ep.root, err = b.factory.ConstructUnionAll(left.root, right.root, reqOrdering, hardLimit, enforceHomeRegion) @@ -1780,10 +1880,14 @@ func (b *Builder) buildTopK(e *memo.TopKExpr) (execPlan, error) { } alreadyOrderedPrefix = i + 1 } + sqlOrdering, err := input.sqlOrdering(ordering) + if err != nil { + return execPlan{}, err + } node, err := b.factory.ConstructTopK( input.root, e.K, - exec.OutputOrdering(input.sqlOrdering(ordering)), + exec.OutputOrdering(sqlOrdering), alreadyOrderedPrefix) if err != nil { return execPlan{}, err @@ -1834,9 +1938,13 @@ func (b *Builder) buildSort(sort *memo.SortExpr) (execPlan, error) { alreadyOrderedPrefix = i + 1 } + sqlOrdering, err := input.sqlOrdering(ordering) + if err != nil { + return execPlan{}, err + } node, err := b.factory.ConstructSort( input.root, - exec.OutputOrdering(input.sqlOrdering(ordering)), + exec.OutputOrdering(sqlOrdering), alreadyOrderedPrefix, ) if err != nil { @@ -2048,7 +2156,10 @@ func (b *Builder) buildIndexJoin(join *memo.IndexJoinExpr) (execPlan, error) { b.IndexesUsed = util.CombineUniqueString(b.IndexesUsed, []string{fmt.Sprintf("%d@%d", tab.ID(), pri.ID())}) keyCols := make([]exec.NodeColumnOrdinal, pri.KeyColumnCount()) for i := range keyCols { - keyCols[i] = input.getNodeColumnOrdinal(join.Table.ColumnID(pri.Column(i).Ordinal())) + keyCols[i], err = input.getNodeColumnOrdinal(join.Table.ColumnID(pri.Column(i).Ordinal())) + if err != nil { + return execPlan{}, err + } } cols := join.Cols @@ -2062,8 +2173,12 @@ func (b *Builder) buildIndexJoin(join *memo.IndexJoinExpr) (execPlan, error) { res := execPlan{outputCols: output} b.recordJoinAlgorithm(exec.IndexJoin) + reqOrdering, err := res.reqOrdering(join) + if err != nil { + return execPlan{}, err + } res.root, err = b.factory.ConstructIndexJoin( - input.root, tab, keyCols, needed, res.reqOrdering(join), locking, join.RequiredPhysical().LimitHintInt64(), + input.root, tab, keyCols, needed, reqOrdering, locking, join.RequiredPhysical().LimitHintInt64(), ) if err != nil { return execPlan{}, err @@ -2287,7 +2402,10 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) { } keyCols := make([]exec.NodeColumnOrdinal, len(join.KeyCols)) for i, c := range join.KeyCols { - keyCols[i] = input.getNodeColumnOrdinal(c) + keyCols[i], err = input.getNodeColumnOrdinal(c) + if err != nil { + return execPlan{}, err + } } inputCols := join.Input.Relational().OutputCols @@ -2355,9 +2473,16 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) { } b.ContainsNonDefaultKeyLocking = b.ContainsNonDefaultKeyLocking || locking.IsLocking() - joinType := joinOpToJoinType(join.JoinType) + joinType, err := joinOpToJoinType(join.JoinType) + if err != nil { + return execPlan{}, err + } b.recordJoinType(joinType) b.recordJoinAlgorithm(exec.LookupJoin) + reqOrdering, err := res.reqOrdering(join) + if err != nil { + return execPlan{}, err + } res.root, err = b.factory.ConstructLookupJoin( joinType, input.root, @@ -2371,7 +2496,7 @@ func (b *Builder) buildLookupJoin(join *memo.LookupJoinExpr) (execPlan, error) { onExpr, join.IsFirstJoinInPairedJoiner, join.IsSecondJoinInPairedJoiner, - res.reqOrdering(join), + reqOrdering, locking, join.RequiredPhysical().LimitHintInt64(), join.RemoteOnlyLookups, @@ -2519,7 +2644,10 @@ func (b *Builder) buildInvertedJoin(join *memo.InvertedJoinExpr) (execPlan, erro prefixEqCols := make([]exec.NodeColumnOrdinal, len(join.PrefixKeyCols)) for i, c := range join.PrefixKeyCols { - prefixEqCols[i] = input.getNodeColumnOrdinal(c) + prefixEqCols[i], err = input.getNodeColumnOrdinal(c) + if err != nil { + return execPlan{}, err + } } inputCols := join.Input.Relational().OutputCols @@ -2587,9 +2715,16 @@ func (b *Builder) buildInvertedJoin(join *memo.InvertedJoinExpr) (execPlan, erro } b.ContainsNonDefaultKeyLocking = b.ContainsNonDefaultKeyLocking || locking.IsLocking() - joinType := joinOpToJoinType(join.JoinType) + joinType, err := joinOpToJoinType(join.JoinType) + if err != nil { + return execPlan{}, err + } b.recordJoinType(joinType) b.recordJoinAlgorithm(exec.InvertedJoin) + reqOrdering, err := res.reqOrdering(join) + if err != nil { + return execPlan{}, err + } res.root, err = b.factory.ConstructInvertedJoin( joinType, invertedExpr, @@ -2600,7 +2735,7 @@ func (b *Builder) buildInvertedJoin(join *memo.InvertedJoinExpr) (execPlan, erro lookupOrdinals, onExpr, join.IsFirstJoinInPairedJoiner, - res.reqOrdering(join), + reqOrdering, locking, ) if err != nil { @@ -2697,6 +2832,10 @@ func (b *Builder) buildZigzagJoin(join *memo.ZigzagJoinExpr) (execPlan, error) { } b.recordJoinAlgorithm(exec.ZigZagJoin) + reqOrdering, err := res.reqOrdering(join) + if err != nil { + return execPlan{}, err + } res.root, err = b.factory.ConstructZigzagJoin( leftTable, leftIndex, @@ -2711,7 +2850,7 @@ func (b *Builder) buildZigzagJoin(join *memo.ZigzagJoinExpr) (execPlan, error) { rightEqCols, rightLocking, onExpr, - res.reqOrdering(join), + reqOrdering, ) if err != nil { return execPlan{}, err @@ -2989,7 +3128,7 @@ func (b *Builder) buildFrame(input execPlan, w *memo.WindowsItem) (*tree.WindowF } if boundExpr, ok := b.extractFromOffset(w.Function); ok { if !b.isOffsetMode(w.Frame.StartBoundType) { - panic(errors.AssertionFailedf("expected offset to only be present in offset mode")) + return nil, errors.AssertionFailedf("expected offset to only be present in offset mode") } offset, err := b.buildScalar(&scalarCtx, boundExpr) if err != nil { @@ -3003,7 +3142,7 @@ func (b *Builder) buildFrame(input execPlan, w *memo.WindowsItem) (*tree.WindowF if boundExpr, ok := b.extractToOffset(w.Function); ok { if !b.isOffsetMode(newDef.Bounds.EndBound.BoundType) { - panic(errors.AssertionFailedf("expected offset to only be present in offset mode")) + return nil, errors.AssertionFailedf("expected offset to only be present in offset mode") } offset, err := b.buildScalar(&scalarCtx, boundExpr) if err != nil { @@ -3051,8 +3190,12 @@ func (b *Builder) buildWindow(w *memo.WindowExpr) (execPlan, error) { if c.Descending() { direction = tree.Descending } + indexedVar, err := b.indexedVar(&ctx, b.mem.Metadata(), c.ID()) + if err != nil { + return execPlan{}, err + } orderingExprs[i] = &tree.Order{ - Expr: b.indexedVar(&ctx, b.mem.Metadata(), c.ID()), + Expr: indexedVar, Direction: direction, } } @@ -3061,12 +3204,16 @@ func (b *Builder) buildWindow(w *memo.WindowExpr) (execPlan, error) { partitionExprs := make(tree.Exprs, w.Partition.Len()) i := 0 - w.Partition.ForEach(func(col opt.ColumnID) { + for col, ok := w.Partition.Next(0); ok; col, ok = w.Partition.Next(col + 1) { ordinal, _ := input.outputCols.Get(int(col)) partitionIdxs[i] = exec.NodeColumnOrdinal(ordinal) - partitionExprs[i] = b.indexedVar(&ctx, b.mem.Metadata(), col) + indexedVar, err := b.indexedVar(&ctx, b.mem.Metadata(), col) + if err != nil { + return execPlan{}, err + } + partitionExprs[i] = indexedVar i++ - }) + } argIdxs := make([][]exec.NodeColumnOrdinal, len(w.Windows)) filterIdxs := make([]int, len(w.Windows)) @@ -3086,7 +3233,11 @@ func (b *Builder) buildWindow(w *memo.WindowExpr) (execPlan, error) { argIdxs[i] = make([]exec.NodeColumnOrdinal, fn.ChildCount()) for j, n := 0, fn.ChildCount(); j < n; j++ { col := fn.Child(j).(*memo.VariableExpr).Col - args[j] = b.indexedVar(&ctx, b.mem.Metadata(), col) + indexedVar, err := b.indexedVar(&ctx, b.mem.Metadata(), col) + if err != nil { + return execPlan{}, err + } + args[j] = indexedVar idx, _ := input.outputCols.Get(int(col)) argIdxs[i][j] = exec.NodeColumnOrdinal(idx) } @@ -3101,7 +3252,8 @@ func (b *Builder) buildWindow(w *memo.WindowExpr) (execPlan, error) { if ok { f, ok := filter.(*memo.VariableExpr) if !ok { - panic(errors.AssertionFailedf("expected FILTER expression to be a VariableExpr")) + return execPlan{}, + errors.AssertionFailedf("expected FILTER expression to be a VariableExpr") } filterIdxs[i], _ = input.outputCols.Get(int(f.Col)) @@ -3118,8 +3270,12 @@ func (b *Builder) buildWindow(w *memo.WindowExpr) (execPlan, error) { OrderBy: orderingExprs, Frame: frame, } + wrappedFn, err := b.wrapFunction(name) + if err != nil { + return execPlan{}, err + } exprs[i] = tree.NewTypedFuncExpr( - b.wrapFunction(name), + wrappedFn, 0, args, builtFilter, @@ -3157,6 +3313,10 @@ func (b *Builder) buildWindow(w *memo.WindowExpr) (execPlan, error) { outputIdxs[i] = windowStart + i } + sqlOrdering, err := input.sqlOrdering(ord) + if err != nil { + return execPlan{}, err + } node, err := b.factory.ConstructWindow(input.root, exec.WindowInfo{ Cols: resultCols, Exprs: exprs, @@ -3164,7 +3324,7 @@ func (b *Builder) buildWindow(w *memo.WindowExpr) (execPlan, error) { ArgIdxs: argIdxs, FilterIdxs: filterIdxs, Partition: partitionIdxs, - Ordering: input.sqlOrdering(ord), + Ordering: sqlOrdering, }) if err != nil { return execPlan{}, err @@ -3233,7 +3393,7 @@ func (b *Builder) buildOpaque(opaque *memo.OpaqueRelPrivate) (execPlan, error) { // the columns (in the same order), returns needProj=false. func (b *Builder) needProjection( input execPlan, colList opt.ColList, -) (_ []exec.NodeColumnOrdinal, needProj bool) { +) (_ []exec.NodeColumnOrdinal, needProj bool, err error) { if input.numOutputCols() == len(colList) { identity := true for i, col := range colList { @@ -3243,16 +3403,20 @@ func (b *Builder) needProjection( } } if identity { - return nil, false + return nil, false, nil } } cols := make([]exec.NodeColumnOrdinal, 0, len(colList)) for _, col := range colList { if col != 0 { - cols = append(cols, input.getNodeColumnOrdinal(col)) + ord, err := input.getNodeColumnOrdinal(col) + if err != nil { + return nil, false, err + } + cols = append(cols, ord) } } - return cols, true + return cols, true, nil } // ensureColumns applies a projection as necessary to make the output match the @@ -3260,7 +3424,10 @@ func (b *Builder) needProjection( func (b *Builder) ensureColumns( input execPlan, inputExpr memo.RelExpr, colList opt.ColList, provided opt.Ordering, ) (execPlan, error) { - cols, needProj := b.needProjection(input, colList) + cols, needProj, err := b.needProjection(input, colList) + if err != nil { + return execPlan{}, err + } if !needProj { return input, nil } @@ -3272,8 +3439,11 @@ func (b *Builder) ensureColumns( for i, col := range colList { res.outputCols.Set(int(col), i) } - reqOrdering := exec.OutputOrdering(res.sqlOrdering(provided)) - var err error + sqlOrdering, err := res.sqlOrdering(provided) + if err != nil { + return execPlan{}, err + } + reqOrdering := exec.OutputOrdering(sqlOrdering) res.root, err = b.factory.ConstructSimpleProject(input.root, cols, reqOrdering) return res, err } @@ -3285,7 +3455,11 @@ func (b *Builder) applyPresentation(input execPlan, pres physical.Presentation) colNames := make([]string, len(pres)) var res execPlan for i := range pres { - cols[i] = input.getNodeColumnOrdinal(pres[i].ID) + ord, err := input.getNodeColumnOrdinal(pres[i].ID) + if err != nil { + return execPlan{}, err + } + cols[i] = ord res.outputCols.Set(int(pres[i].ID), i) colNames[i] = pres[i].Alias } @@ -3296,7 +3470,7 @@ func (b *Builder) applyPresentation(input execPlan, pres physical.Presentation) // getEnvData consolidates the information that must be presented in // EXPLAIN (opt, env). -func (b *Builder) getEnvData() exec.ExplainEnvData { +func (b *Builder) getEnvData() (exec.ExplainEnvData, error) { envOpts := exec.ExplainEnvData{ShowEnv: true} var err error envOpts.Tables, envOpts.Sequences, envOpts.Views, err = b.mem.Metadata().AllDataSourceNames( @@ -3305,11 +3479,7 @@ func (b *Builder) getEnvData() exec.ExplainEnvData { return b.catalog.FullyQualifiedName(context.TODO(), ds) }, ) - if err != nil { - panic(err) - } - - return envOpts + return envOpts, err } // statementTag returns a string that can be used in an error message regarding diff --git a/pkg/sql/opt/exec/execbuilder/scalar.go b/pkg/sql/opt/exec/execbuilder/scalar.go index 4bbc14d6e4f4..d813ef0322e8 100644 --- a/pkg/sql/opt/exec/execbuilder/scalar.go +++ b/pkg/sql/opt/exec/execbuilder/scalar.go @@ -137,17 +137,17 @@ func (b *Builder) buildNull(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Ty func (b *Builder) buildVariable( ctx *buildScalarCtx, scalar opt.ScalarExpr, ) (tree.TypedExpr, error) { - return b.indexedVar(ctx, b.mem.Metadata(), *scalar.Private().(*opt.ColumnID)), nil + return b.indexedVar(ctx, b.mem.Metadata(), *scalar.Private().(*opt.ColumnID)) } func (b *Builder) indexedVar( ctx *buildScalarCtx, md *opt.Metadata, colID opt.ColumnID, -) tree.TypedExpr { +) (tree.TypedExpr, error) { idx, ok := ctx.ivarMap.Get(int(colID)) if !ok { - panic(errors.AssertionFailedf("cannot map variable %d to an indexed var", redact.Safe(colID))) + return nil, errors.AssertionFailedf("cannot map variable %d to an indexed var", redact.Safe(colID)) } - return ctx.ivh.IndexedVarWithType(idx, md.ColumnMeta(colID).Type) + return ctx.ivh.IndexedVarWithType(idx, md.ColumnMeta(colID).Type), nil } func (b *Builder) buildTuple(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.TypedExpr, error) { @@ -228,7 +228,7 @@ func (b *Builder) buildBoolean(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree return tree.NewTypedIsNotNullExpr(expr), nil default: - panic(errors.AssertionFailedf("invalid op %s", redact.Safe(scalar.Op()))) + return nil, errors.AssertionFailedf("invalid op %s", redact.Safe(scalar.Op())) } } @@ -294,7 +294,10 @@ func (b *Builder) buildFunction( return nil, err } } - funcRef := b.wrapFunction(fn.Name) + funcRef, err := b.wrapFunction(fn.Name) + if err != nil { + return nil, err + } return tree.NewTypedFuncExpr( funcRef, 0, /* aggQualifier */ @@ -375,7 +378,10 @@ func (b *Builder) buildAssignmentCast( return input, nil } const fnName = "crdb_internal.assignment_cast" - funcRef := b.wrapFunction(fnName) + funcRef, err := b.wrapFunction(fnName) + if err != nil { + return nil, err + } props, overloads := builtinsregistry.GetBuiltinProperties(fnName) return tree.NewTypedFuncExpr( funcRef, @@ -494,7 +500,7 @@ func (b *Builder) buildArrayFlatten( // The subquery here should always be uncorrelated: if it were not, we would // have converted it to an aggregation. if !af.Input.Relational().OuterCols.Empty() { - panic(errors.AssertionFailedf("input to ArrayFlatten should be uncorrelated")) + return nil, errors.AssertionFailedf("input to ArrayFlatten should be uncorrelated") } if b.planLazySubqueries { @@ -634,7 +640,11 @@ func (b *Builder) buildExistsSubquery( // arguments of the routine. args := make(tree.TypedExprs, len(params)) for i := range args { - args[i] = b.indexedVar(ctx, b.mem.Metadata(), params[i]) + indexedVar, err := b.indexedVar(ctx, b.mem.Metadata(), params[i]) + if err != nil { + return nil, err + } + args[i] = indexedVar } // Create a new column for the boolean result. @@ -734,7 +744,11 @@ func (b *Builder) buildSubquery( // The arguments are indexed variables representing the outer columns. args := make(tree.TypedExprs, len(params)) for i := range args { - args[i] = b.indexedVar(ctx, b.mem.Metadata(), params[i]) + indexedVar, err := b.indexedVar(ctx, b.mem.Metadata(), params[i]) + if err != nil { + return nil, err + } + args[i] = indexedVar } // Create a single-element RelListExpr representing the subquery. diff --git a/pkg/sql/opt/exec/execbuilder/statement.go b/pkg/sql/opt/exec/execbuilder/statement.go index e2830c3004f7..eceb09420ce4 100644 --- a/pkg/sql/opt/exec/execbuilder/statement.go +++ b/pkg/sql/opt/exec/execbuilder/statement.go @@ -138,7 +138,11 @@ func (b *Builder) buildExplainOpt(explain *memo.ExplainExpr) (execPlan, error) { // tell the exec factory what information it needs to fetch. var envOpts exec.ExplainEnvData if explain.Options.Flags[tree.ExplainFlagEnv] { - envOpts = b.getEnvData() + var err error + envOpts, err = b.getEnvData() + if err != nil { + return execPlan{}, err + } } node, err := b.factory.ConstructExplainOpt(planText.String(), envOpts) @@ -397,7 +401,10 @@ func (b *Builder) buildExport(export *memo.ExportExpr) (execPlan, error) { return execPlan{}, err } } - notNullColsSet := input.getNodeColumnOrdinalSet(export.Input.Relational().NotNullCols) + notNullColsSet, err := input.getNodeColumnOrdinalSet(export.Input.Relational().NotNullCols) + if err != nil { + return execPlan{}, err + } node, err := b.factory.ConstructExport( input.root, From cf086e76a32332a5173283a10fdf76830939cc01 Mon Sep 17 00:00:00 2001 From: Arul Ajmani Date: Fri, 31 Mar 2023 16:01:26 -0400 Subject: [PATCH 04/12] kvserver: nudge replicate queue on span config update We eagerly enqueue replicas into the split/merge queues whenever there is a span config update. A span config update could also imply a need to {up,down}replicate as well. This patch actively enqueues overlapping replicas into the replicate queue as well. Epic: none Release note: None --- pkg/kv/kvserver/store.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 231a3d6cdec1..f5c5e3cbde64 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2381,13 +2381,17 @@ func (s *Store) onSpanConfigUpdate(ctx context.Context, updated roachpb.Span) { // TODO(irfansharif): For symmetry with the system config span variant, // we queue blindly; we could instead only queue it if we knew the // range's keyspans has a split in there somewhere, or was now part of a - // larger range and eligible for a merge. + // larger range and eligible for a merge, or the span config implied a + // need for {up,down}replication. s.splitQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { h.MaybeAdd(ctx, repl, now) }) s.mergeQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { h.MaybeAdd(ctx, repl, now) }) + s.replicateQueue.Async(replCtx, "span config update", true /* wait */, func(ctx context.Context, h queueHelper) { + h.MaybeAdd(ctx, repl, now) + }) return nil // more }, ); err != nil { From e279bb1330de06caa1f81444f5769626f70b15fc Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Fri, 31 Mar 2023 17:49:12 -0400 Subject: [PATCH 05/12] sql: fix read-only SSL var The variable will now look at the connection state to determine if SSL/TLS is being used, rather than relying on server configuration params, which aren't sufficient to be able to determine the type of connection. Release note: None --- pkg/sql/conn_executor.go | 1 + pkg/sql/exec_util.go | 1 + pkg/sql/pgwire/auth_test.go | 48 +++++++++++++++++++++++++++++ pkg/sql/pgwire/pre_serve.go | 1 + pkg/sql/pgwire/server.go | 3 +- pkg/sql/sessiondata/session_data.go | 7 +++-- pkg/sql/vars.go | 3 +- 7 files changed, 58 insertions(+), 6 deletions(-) diff --git a/pkg/sql/conn_executor.go b/pkg/sql/conn_executor.go index 7bd7578d2b78..6b23641e4b76 100644 --- a/pkg/sql/conn_executor.go +++ b/pkg/sql/conn_executor.go @@ -880,6 +880,7 @@ func newSessionData(args SessionArgs) *sessiondata.SessionData { }, LocalUnmigratableSessionData: sessiondata.LocalUnmigratableSessionData{ RemoteAddr: args.RemoteAddr, + IsSSL: args.IsSSL, }, LocalOnlySessionData: sessiondatapb.LocalOnlySessionData{ ResultsBufferSize: args.ConnResultsBufferSize, diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index b37ec31d68b3..7b589fb23738 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -2119,6 +2119,7 @@ type SessionDefaults map[string]string type SessionArgs struct { User username.SQLUsername IsSuperuser bool + IsSSL bool SystemIdentity username.SQLUsername SessionDefaults SessionDefaults CustomOptionSessionDefaults SessionDefaults diff --git a/pkg/sql/pgwire/auth_test.go b/pkg/sql/pgwire/auth_test.go index c2fb20baf9df..4858105c567c 100644 --- a/pkg/sql/pgwire/auth_test.go +++ b/pkg/sql/pgwire/auth_test.go @@ -48,6 +48,7 @@ import ( "github.com/cockroachdb/errors" "github.com/cockroachdb/errors/stdstrings" "github.com/cockroachdb/redact" + pgx "github.com/jackc/pgx/v4" "github.com/lib/pq" "github.com/stretchr/testify/require" ) @@ -773,4 +774,51 @@ func TestClientAddrOverride(t *testing.T) { } } +// TestSSLSessionVar checks that the read-only SSL session variable correctly +// reflects the state of the connection. +func TestSSLSessionVar(t *testing.T) { + defer leaktest.AfterTest(t)() + sc := log.ScopeWithoutShowLogs(t) + defer sc.Close(t) + + // Start a server. + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + s.(*server.TestServer).Cfg.AcceptSQLWithoutTLS = true + ctx := context.Background() + defer s.Stopper().Stop(ctx) + + // Ensure the test user exists. + if _, err := db.Exec(fmt.Sprintf(`CREATE USER %s WITH PASSWORD 'abc'`, username.TestUser)); err != nil { + t.Fatal(err) + } + + pgURLWithCerts, cleanupFuncCerts := sqlutils.PGUrlWithOptionalClientCerts( + t, s.ServingSQLAddr(), "TestSSLSessionVarCerts" /* prefix */, url.User(username.TestUser), true, + ) + defer cleanupFuncCerts() + + pgURLWithoutCerts, cleanupFuncWithoutCerts := sqlutils.PGUrlWithOptionalClientCerts( + t, s.ServingSQLAddr(), "TestSSLSessionVarNoCerts" /* prefix */, url.UserPassword(username.TestUser, "abc"), false, + ) + defer cleanupFuncWithoutCerts() + q := pgURLWithoutCerts.Query() + q.Set("sslmode", "disable") + pgURLWithoutCerts.RawQuery = q.Encode() + + // Connect with certs. + connWithCerts, err := pgx.Connect(ctx, pgURLWithCerts.String()) + require.NoError(t, err) + var result string + err = connWithCerts.QueryRow(ctx, "SHOW ssl").Scan(&result) + require.NoError(t, err) + require.Equal(t, "on", result) + + // Connect without certs. + connWithoutCerts, err := pgx.Connect(ctx, pgURLWithoutCerts.String()) + require.NoError(t, err) + err = connWithoutCerts.QueryRow(ctx, "SHOW ssl").Scan(&result) + require.NoError(t, err) + require.Equal(t, "off", result) +} + var sessionTerminatedRe = regexp.MustCompile("client_session_end") diff --git a/pkg/sql/pgwire/pre_serve.go b/pkg/sql/pgwire/pre_serve.go index 4e1da9ee5b6f..2d5dcfa11c6e 100644 --- a/pkg/sql/pgwire/pre_serve.go +++ b/pkg/sql/pgwire/pre_serve.go @@ -404,6 +404,7 @@ func (s *PreServeConnHandler) PreServe( st.Reserved.Close(ctx) return conn, st, s.sendErr(ctx, conn, err) } + st.clientParameters.IsSSL = st.ConnType == hba.ConnHostSSL st.State = PreServeReady return conn, st, nil diff --git a/pkg/sql/pgwire/server.go b/pkg/sql/pgwire/server.go index 485aba21bceb..164feb30909a 100644 --- a/pkg/sql/pgwire/server.go +++ b/pkg/sql/pgwire/server.go @@ -746,8 +746,7 @@ func (s *Server) ServeConn( return s.sendErr(ctx, st, conn, newAdminShutdownErr(ErrDrainingNewConn)) } - sArgs, err := finalizeClientParameters(ctx, preServeStatus.clientParameters, - &st.SV) + sArgs, err := finalizeClientParameters(ctx, preServeStatus.clientParameters, &st.SV) if err != nil { preServeStatus.Reserved.Close(ctx) return s.sendErr(ctx, st, conn, err) diff --git a/pkg/sql/sessiondata/session_data.go b/pkg/sql/sessiondata/session_data.go index aaee61e756d8..e56b8146fd96 100644 --- a/pkg/sql/sessiondata/session_data.go +++ b/pkg/sql/sessiondata/session_data.go @@ -169,12 +169,15 @@ type LocalUnmigratableSessionData struct { // dependencies. Temporary tables are not supported in session migrations. DatabaseIDToTempSchemaID map[uint32]uint32 - /////////////////////////////////////////////////////////////////////////// + // IsSSL indicates whether the session is using SSL/TLS. + IsSSL bool + + // //////////////////////////////////////////////////////////////////////// // WARNING: consider whether a session parameter you're adding needs to // // be propagated to the remote nodes or needs to persist amongst session // // migrations. If so, they should live in the LocalOnlySessionData or // // SessionData protobuf in the sessiondatapb package. // - /////////////////////////////////////////////////////////////////////////// + // //////////////////////////////////////////////////////////////////////// } // IsTemporarySchemaID returns true if the given ID refers to any of the temp diff --git a/pkg/sql/vars.go b/pkg/sql/vars.go index 9eeececed6e1..e5560b65ee2c 100644 --- a/pkg/sql/vars.go +++ b/pkg/sql/vars.go @@ -1330,8 +1330,7 @@ var varGen = map[string]sessionVar{ `ssl`: { Hidden: true, Get: func(evalCtx *extendedEvalContext, _ *kv.Txn) (string, error) { - insecure := evalCtx.ExecCfg.RPCContext.Config.Insecure || evalCtx.ExecCfg.RPCContext.Config.AcceptSQLWithoutTLS - return formatBoolAsPostgresSetting(!insecure), nil + return formatBoolAsPostgresSetting(evalCtx.SessionData().IsSSL), nil }, }, From 75c083ca4e54297c38ef73863300250140ea92e5 Mon Sep 17 00:00:00 2001 From: ajwerner Date: Wed, 15 Mar 2023 13:15:04 -0400 Subject: [PATCH 06/12] upgrades: make the schema telemetry job setup permanent This migration should run for every cluster. Epic: none Informs: https://github.com/cockroachdb/cockroach/issues/96763 Release note: None --- pkg/cli/testdata/doctor/test_examine_cluster | 2 +- pkg/clusterversion/cockroach_versions.go | 8 +++-- .../schematelemetrycontroller/BUILD.bazel | 1 - .../schematelemetrycontroller/controller.go | 12 ++------ ...sure_sql_schema_telemetry_schedule_test.go | 29 +++++-------------- pkg/upgrade/upgrades/upgrades.go | 6 ++-- 6 files changed, 18 insertions(+), 40 deletions(-) diff --git a/pkg/cli/testdata/doctor/test_examine_cluster b/pkg/cli/testdata/doctor/test_examine_cluster index a38c001051db..9ef1865168d3 100644 --- a/pkg/cli/testdata/doctor/test_examine_cluster +++ b/pkg/cli/testdata/doctor/test_examine_cluster @@ -3,5 +3,5 @@ debug doctor examine cluster debug doctor examine cluster Examining 57 descriptors and 56 namespace entries... ParentID 100, ParentSchemaID 101: relation "foo" (105): expected matching namespace entry, found none -Examining 18 jobs... +Examining 19 jobs... ERROR: validation failed diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index afb005d65294..d21e3112c613 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -224,9 +224,11 @@ const ( TODODelete_V22_2SystemUsersIDColumnIsBackfilled // TODODelete_V22_2SetSystemUsersUserIDColumnNotNull sets the user_id column in system.users to not null. TODODelete_V22_2SetSystemUsersUserIDColumnNotNull - // TODODelete_V22_2SQLSchemaTelemetryScheduledJobs adds an automatic schedule for SQL schema + // Permanent_V22_2SQLSchemaTelemetryScheduledJobs adds an automatic schedule for SQL schema // telemetry logging jobs. - TODODelete_V22_2SQLSchemaTelemetryScheduledJobs + // + // This is a permanent migration which should exist forever. + Permanent_V22_2SQLSchemaTelemetryScheduledJobs // TODODelete_V22_2SchemaChangeSupportsCreateFunction adds support of CREATE FUNCTION // statement. TODODelete_V22_2SchemaChangeSupportsCreateFunction @@ -655,7 +657,7 @@ var rawVersionsSingleton = keyedVersions{ Version: roachpb.Version{Major: 22, Minor: 1, Internal: 40}, }, { - Key: TODODelete_V22_2SQLSchemaTelemetryScheduledJobs, + Key: Permanent_V22_2SQLSchemaTelemetryScheduledJobs, Version: roachpb.Version{Major: 22, Minor: 1, Internal: 42}, }, { diff --git a/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/BUILD.bazel b/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/BUILD.bazel index 1bf71e55fc41..e2c159e23397 100644 --- a/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/BUILD.bazel +++ b/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/BUILD.bazel @@ -25,7 +25,6 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller", visibility = ["//visibility:public"], deps = [ - "//pkg/clusterversion", "//pkg/jobs", "//pkg/jobs/jobspb", "//pkg/scheduledjobs", diff --git a/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/controller.go b/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/controller.go index afe7e98bdc0c..ab4a4317811c 100644 --- a/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/controller.go +++ b/pkg/sql/catalog/schematelemetry/schematelemetrycontroller/controller.go @@ -17,7 +17,6 @@ import ( "math/rand" "time" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" "github.com/cockroachdb/cockroach/pkg/scheduledjobs" @@ -134,9 +133,8 @@ func (c *Controller) Start(ctx context.Context, stopper *stop.Stopper) { }) } // Trigger a schedule update to ensure it exists at startup. - if c.st.Version.IsActive(ctx, clusterversion.TODODelete_V22_2SQLSchemaTelemetryScheduledJobs) { - notify("ensure-at-startup") - } + notify("ensure-at-startup") + // Add a change hook on the recurrence cluster setting that will notify // a schedule update. SchemaTelemetryRecurrence.SetOnChange(&c.st.SV, func(ctx context.Context) { @@ -145,9 +143,6 @@ func (c *Controller) Start(ctx context.Context, stopper *stop.Stopper) { } func updateSchedule(ctx context.Context, db isql.DB, st *cluster.Settings, clusterID uuid.UUID) { - if !st.Version.IsActive(ctx, clusterversion.TODODelete_V22_2SQLSchemaTelemetryScheduledJobs) { - log.Infof(ctx, "failed to update SQL schema telemetry schedule: %s", ErrVersionGate) - } retryOptions := retry.Options{ InitialBackoff: time.Second, MaxBackoff: 10 * time.Minute, @@ -223,9 +218,6 @@ var cronExprRewrites = map[string]func(r *rand.Rand) string{ func (c *Controller) CreateSchemaTelemetryJob( ctx context.Context, createdByName string, createdByID int64, ) (id int64, _ error) { - if !c.st.Version.IsActive(ctx, clusterversion.TODODelete_V22_2SQLSchemaTelemetryScheduledJobs) { - return 0, ErrVersionGate - } var j *jobs.Job if err := c.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) (err error) { r := CreateSchemaTelemetryJobRecord(createdByName, createdByID) diff --git a/pkg/upgrade/upgrades/ensure_sql_schema_telemetry_schedule_test.go b/pkg/upgrade/upgrades/ensure_sql_schema_telemetry_schedule_test.go index a964142d67c3..6389981d8332 100644 --- a/pkg/upgrade/upgrades/ensure_sql_schema_telemetry_schedule_test.go +++ b/pkg/upgrade/upgrades/ensure_sql_schema_telemetry_schedule_test.go @@ -18,17 +18,14 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobstest" "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/catalog/schematelemetry/schematelemetrycontroller" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins/builtinconstants" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/testutils" - "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -41,8 +38,6 @@ import ( func TestSchemaTelemetrySchedule(t *testing.T) { defer leaktest.AfterTest(t)() - skip.WithIssue(t, 95530, "bump minBinary to 22.2. Skip 22.2 mixed-version tests for future cleanup") - // We want to ensure that the migration will succeed when run again. // To ensure that it will, we inject a failure when trying to mark // the upgrade as complete when forceRetry is true. @@ -75,10 +70,7 @@ func TestSchemaTelemetrySchedule(t *testing.T) { args.Knobs.SchemaTelemetry = &sql.SchemaTelemetryTestingKnobs{ AOSTDuration: &aostDuration, } - args.Knobs.Server = &server.TestingKnobs{ - DisableAutomaticVersionUpgrade: make(chan struct{}), - BinaryVersionOverride: clusterversion.ByKey(clusterversion.TODODelete_V22_2SQLSchemaTelemetryScheduledJobs - 1), - } + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ServerArgs: args}) defer tc.Stopper().Stop(ctx) tdb := sqlutils.MakeSQLRunner(tc.ServerConn(0)) @@ -93,26 +85,19 @@ func TestSchemaTelemetrySchedule(t *testing.T) { qJob := fmt.Sprintf(`SELECT %s()`, builtinconstants.CreateSchemaTelemetryJobBuiltinName) - // Check that there is no schema telemetry schedule and that creating schema - // telemetry jobs is not possible. - tdb.CheckQueryResults(t, qExists, [][]string{}) - tdb.ExpectErr(t, schematelemetrycontroller.ErrVersionGate.Error(), qJob) - - // Upgrade the cluster. - tdb.Exec(t, `SET CLUSTER SETTING version = $1`, - clusterversion.ByKey(clusterversion.TODODelete_V22_2SQLSchemaTelemetryScheduledJobs).String()) + clusterID := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).NodeInfo. + LogicalClusterID() - // Check that the schedule now exists and that jobs can be created. + // Check that the schedule exists and that jobs can be created. tdb.Exec(t, qJob) - tdb.CheckQueryResultsRetry(t, qExists, [][]string{{"@weekly", "1"}}) + exp := schematelemetrycontroller.MaybeRewriteCronExpr(clusterID, "@weekly") + tdb.CheckQueryResultsRetry(t, qExists, [][]string{{exp, "1"}}) // Check that the schedule can have its recurrence altered. tdb.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING %s = '* * * * *'`, schematelemetrycontroller.SchemaTelemetryRecurrence.Key())) tdb.CheckQueryResultsRetry(t, qExists, [][]string{{"* * * * *", "1"}}) - clusterID := tc.Server(0).ExecutorConfig().(sql.ExecutorConfig).NodeInfo. - LogicalClusterID() - exp := schematelemetrycontroller.MaybeRewriteCronExpr(clusterID, "@daily") + exp = schematelemetrycontroller.MaybeRewriteCronExpr(clusterID, "@daily") tdb.Exec(t, fmt.Sprintf(`SET CLUSTER SETTING %s = '@daily'`, schematelemetrycontroller.SchemaTelemetryRecurrence.Key())) tdb.CheckQueryResultsRetry(t, qExists, [][]string{{exp, "1"}}) diff --git a/pkg/upgrade/upgrades/upgrades.go b/pkg/upgrade/upgrades/upgrades.go index ce9f94e24214..d1a0c897ee11 100644 --- a/pkg/upgrade/upgrades/upgrades.go +++ b/pkg/upgrade/upgrades/upgrades.go @@ -111,11 +111,11 @@ var upgrades = []upgradebase.Upgrade{ upgrade.NoPrecondition, systemExternalConnectionsTableMigration, ), - upgrade.NewTenantUpgrade( + upgrade.NewPermanentTenantUpgrade( "add default SQL schema telemetry schedule", - toCV(clusterversion.TODODelete_V22_2SQLSchemaTelemetryScheduledJobs), - upgrade.NoPrecondition, + toCV(clusterversion.Permanent_V22_2SQLSchemaTelemetryScheduledJobs), ensureSQLSchemaTelemetrySchedule, + "add default SQL schema telemetry schedule", ), upgrade.NewTenantUpgrade( "wait for all in-flight schema changes", From 2b9d291a3fc008fd73087a617e6b71839c32c7e9 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Thu, 23 Mar 2023 22:55:08 +0000 Subject: [PATCH 07/12] metric: extend manual histogram to support rotate This commit extends the ManualWindowHistogram to support RecordValue and Rotate. Previously, it was necessary to maintain duplicate cumulative histograms in order to batch update the manual histogram. This update adds a quality of life feature, enabling recording to the ManualWindowHistogram, then once finished, rotating the batch of recorded values into the current window for the internal tsdb to query. Touches: #98266 Release note: None --- pkg/kv/kvserver/metrics.go | 6 +- pkg/kv/kvserver/store.go | 23 ++---- pkg/util/metric/metric.go | 128 ++++++++++++++++++++++++++------- pkg/util/metric/metric_test.go | 6 ++ 4 files changed, 118 insertions(+), 45 deletions(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index ad2c387acb76..e184c7d4c5db 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -2757,7 +2757,11 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { Buckets: metric.IOLatencyBuckets, }), FlushUtilization: metric.NewGaugeFloat64(metaStorageFlushUtilization), - FsyncLatency: metric.NewManualWindowHistogram(metaStorageFsyncLatency, pebble.FsyncLatencyBuckets), + FsyncLatency: metric.NewManualWindowHistogram( + metaStorageFsyncLatency, + pebble.FsyncLatencyBuckets, + false, /* withRotate */ + ), ReplicaReadBatchDroppedLatchesBeforeEval: metric.NewCounter(metaReplicaReadBatchDroppedLatchesBeforeEval), ReplicaReadBatchWithoutInterleavingIter: metric.NewCounter(metaReplicaReadBatchWithoutInterleavingIter), diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 231a3d6cdec1..85abac4dcf1c 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -3215,7 +3215,10 @@ func (s *Store) ComputeMetricsPeriodically( if err != nil { return m, err } - windowFsyncLatency = subtractPrometheusMetrics(windowFsyncLatency, prevFsync) + metric.SubtractPrometheusHistograms( + windowFsyncLatency.GetHistogram(), + prevFsync.GetHistogram(), + ) s.metrics.FsyncLatency.Update(m.LogWriter.FsyncLatency, windowFsyncLatency.Histogram) } @@ -3250,24 +3253,6 @@ func (s *Store) ComputeMetricsPeriodically( return m, nil } -func subtractPrometheusMetrics( - curFsync *prometheusgo.Metric, prevFsync prometheusgo.Metric, -) *prometheusgo.Metric { - prevBuckets := prevFsync.Histogram.GetBucket() - curBuckets := curFsync.Histogram.GetBucket() - - *curFsync.Histogram.SampleCount -= prevFsync.Histogram.GetSampleCount() - *curFsync.Histogram.SampleSum -= prevFsync.Histogram.GetSampleSum() - - for idx, v := range prevBuckets { - if *curBuckets[idx].UpperBound != *v.UpperBound { - panic("Bucket Upperbounds don't match") - } - *curBuckets[idx].CumulativeCount -= *v.CumulativeCount - } - return curFsync -} - // ComputeMetrics immediately computes the current value of store metrics which // cannot be computed incrementally. This method should be invoked periodically // by a higher-level system which records store metrics. diff --git a/pkg/util/metric/metric.go b/pkg/util/metric/metric.go index 3b3b806404fd..65ef4daf90d7 100644 --- a/pkg/util/metric/metric.go +++ b/pkg/util/metric/metric.go @@ -436,45 +436,123 @@ var _ WindowedHistogram = (*ManualWindowHistogram)(nil) // NewManualWindowHistogram is a prometheus-backed histogram. Depending on the // value of the buckets parameter, this is suitable for recording any kind of -// quantity. The histogram is very similar to Histogram produced by NewHistogram -// with the main difference being that Histogram supports collecting values over -// time using the Histogram.RecordValue whereas this histogram does not support -// collecting values. Instead, this histogram supports replacing the cumulative -// and windowed histogram. This means that it is the responsibility of the -// creator of this histogram to replace the values by calling -// ManualWindowHistogram.Update. -func NewManualWindowHistogram(meta Metadata, buckets []float64) *ManualWindowHistogram { +// quantity. The histogram is very similar to Histogram produced by +// NewHistogram with the main difference being that Histogram supports +// collecting values over time using the Histogram.RecordValue whereas this +// histogram provides limited support RecordValue, the caller is responsible +// for calling Rotate, after recording is complete or manually providing the +// cumulative and current windowed histogram via Update. This means that it is +// the responsibility of the creator of this histogram to replace the values by +// either calling ManualWindowHistogram.Update or +// ManualWindowHistogram.RecordValue and ManualWindowHistogram.Rotate. If +// NewManualWindowHistogram is called withRotate as true, only the RecordValue +// and Rotate method may be used; withRotate as false, only Update may be used. +// TODO(kvoli,aadityasondhi): The two ways to use this histogram is a hack and +// "temporary", rationalize the interface. Tracked in #98622. +func NewManualWindowHistogram( + meta Metadata, buckets []float64, withRotate bool, +) *ManualWindowHistogram { opts := prometheus.HistogramOpts{ Buckets: buckets, } cum := prometheus.NewHistogram(opts) + prev := &prometheusgo.Metric{} + if err := cum.Write(prev); err != nil { + panic(err.Error()) + } + h := &ManualWindowHistogram{ - Metadata: meta, - cum: cum, - windowedHistogram: nil, + Metadata: meta, + rotating: withRotate, + cum: cum, + prev: prev.GetHistogram(), + cur: nil, } return h } // ManualWindowHistogram is a prometheus-backed histogram. Internally there are -// two sets of histograms: one is the cumulative set (i.e. data is never -// evicted) which is a prometheus.Histogram and the other is the windowed set -// (which keeps only recently collected samples) which is a -// prometheusgo.Histogram. Both histograms must be updated by the client by -// calling ManualWindowHistogram.Update. +// three sets of histograms: one is the cumulative set (i.e. data is never +// evicted) which is a prometheus.Histogram, the cumulative histogram value +// when last rotated and the current histogram, which is windowed. Both the +// previous and current histograms are prometheusgo.Histograms. Both histograms +// must be updated by the client by calling either ManualWindowHistogram.Update +// or ManualWindowHistogram.RecordValue and subsequently Rotate. type ManualWindowHistogram struct { Metadata syncutil.RWMutex - cum prometheus.Histogram - windowedHistogram *prometheusgo.Histogram + rotating bool + cum prometheus.Histogram + prev, cur *prometheusgo.Histogram } -// Update replaces the cumulative and window histograms. -func (mwh *ManualWindowHistogram) Update(cum prometheus.Histogram, window *prometheusgo.Histogram) { +// Update replaces the cumulative and current windowed histograms. +func (mwh *ManualWindowHistogram) Update(cum prometheus.Histogram, cur *prometheusgo.Histogram) { mwh.Lock() defer mwh.Unlock() + if mwh.rotating { + panic("Unexpected call to Update with rotate enabled") + } + mwh.cum = cum - mwh.windowedHistogram = window + mwh.cur = cur +} + +// RecordValue records a value to the cumulative histogram. The value is only +// added to the current window histogram once Rotate is called. +func (mwh *ManualWindowHistogram) RecordValue(val float64) { + mwh.Lock() + defer mwh.Unlock() + if !mwh.rotating { + panic("Unexpected call to RecordValue with rotate disabled") + } + mwh.cum.Observe(val) +} + +// SubtractPrometheusHistograms subtracts the prev histogram from the cur +// histogram, in place modifying the cur histogram. The bucket boundaries must +// be identical for both prev and cur. +func SubtractPrometheusHistograms(cur *prometheusgo.Histogram, prev *prometheusgo.Histogram) { + prevBuckets := prev.GetBucket() + curBuckets := cur.GetBucket() + + *cur.SampleCount -= prev.GetSampleCount() + *cur.SampleSum -= prev.GetSampleSum() + + for idx, v := range prevBuckets { + if *curBuckets[idx].UpperBound != *v.UpperBound { + panic("Bucket Upperbounds don't match") + } + *curBuckets[idx].CumulativeCount -= *v.CumulativeCount + } +} + +// Rotate sets the current windowed histogram (cur) to be the delta of the +// cumulative histogram at the last rotation (prev) and the cumulative +// histogram currently (cum). +func (mwh *ManualWindowHistogram) Rotate() error { + mwh.Lock() + defer mwh.Unlock() + + if !mwh.rotating { + panic("Unexpected call to RecordValue with rotate disabled") + } + + cur := &prometheusgo.Metric{} + if err := mwh.cum.Write(cur); err != nil { + return err + } + + SubtractPrometheusHistograms(cur.GetHistogram(), mwh.prev) + mwh.cur = cur.GetHistogram() + prev := &prometheusgo.Metric{} + + if err := mwh.cum.Write(prev); err != nil { + return err + } + mwh.prev = prev.GetHistogram() + + return nil } // GetMetadata returns the metric's metadata including the Prometheus @@ -506,14 +584,14 @@ func (mwh *ManualWindowHistogram) ToPrometheusMetric() *prometheusgo.Metric { func (mwh *ManualWindowHistogram) TotalCountWindowed() int64 { mwh.RLock() defer mwh.RUnlock() - return int64(mwh.windowedHistogram.GetSampleCount()) + return int64(mwh.cur.GetSampleCount()) } // TotalSumWindowed implements the WindowedHistogram interface. func (mwh *ManualWindowHistogram) TotalSumWindowed() float64 { mwh.RLock() defer mwh.RUnlock() - return mwh.windowedHistogram.GetSampleSum() + return mwh.cur.GetSampleSum() } // ValueAtQuantileWindowed implements the WindowedHistogram interface. @@ -523,10 +601,10 @@ func (mwh *ManualWindowHistogram) TotalSumWindowed() float64 { func (mwh *ManualWindowHistogram) ValueAtQuantileWindowed(q float64) float64 { mwh.RLock() defer mwh.RUnlock() - if mwh.windowedHistogram == nil { + if mwh.cur == nil { return 0 } - return ValueAtQuantileWindowed(mwh.windowedHistogram, q) + return ValueAtQuantileWindowed(mwh.cur, q) } // A Counter holds a single mutable atomic value. diff --git a/pkg/util/metric/metric_test.go b/pkg/util/metric/metric_test.go index bee57d768b8a..436d58547941 100644 --- a/pkg/util/metric/metric_test.go +++ b/pkg/util/metric/metric_test.go @@ -217,6 +217,7 @@ func TestManualWindowHistogram(t *testing.T) { h := NewManualWindowHistogram( Metadata{}, buckets, + false, /* withRotate */ ) // should return 0 if no observations are made @@ -254,6 +255,11 @@ func TestManualWindowHistogram(t *testing.T) { t.Fatalf("expected differs from actual: %s", pretty.Diff(exp, act)) } + // Rotate and RecordValue are not supported when using Update. See comment on + // NewManualWindowHistogram. + require.Panics(t, func() { h.RecordValue(0) }) + require.Panics(t, func() { _ = h.Rotate() }) + require.Equal(t, 0.0, h.ValueAtQuantileWindowed(0)) require.Equal(t, 1.0, h.ValueAtQuantileWindowed(10)) require.Equal(t, 17.5, h.ValueAtQuantileWindowed(50)) From c55c584e6711a468645a1d7289e57ad2f83ed81c Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Thu, 9 Mar 2023 00:03:16 +0000 Subject: [PATCH 08/12] kvserver: add replica cpu histogram metric This commit introduces a histogram tracking percentiles of replica CPU time. Previously, there was only point in time insight into replica CPU distribution via hotranges. This change enables historical timeseries tracking via the metric `rebalancing.replicas.cpunanospersecond`. Part of: #98255 Release note (ops change): The `rebalancing.replicas.cpunanospersecond` histogram metric is added, which provides insight into the distribution of replica CPU usage within a store. --- pkg/kv/kvserver/metrics.go | 27 +++++++++++++++++------ pkg/kv/kvserver/store.go | 8 ++++++- pkg/ts/catalog/chart_catalog.go | 4 ++++ pkg/util/metric/histogram_buckets.go | 23 +++++++++++++++++++ pkg/util/metric/histogram_buckets_test.go | 6 +++++ 5 files changed, 60 insertions(+), 8 deletions(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index e184c7d4c5db..da7a4ef35161 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -366,6 +366,13 @@ var ( Measurement: "Nanoseconds/Sec", Unit: metric.Unit_NANOSECONDS, } + metaRecentReplicaCPUNanosPerSecond = metric.Metadata{ + Name: "rebalancing.replicas.cpunanospersecond", + Help: "Histogram of average CPU nanoseconds spent on processing " + + "replica operations in the last 30 minutes.", + Measurement: "Nanoseconds/Sec", + Unit: metric.Unit_NANOSECONDS, + } // Metric for tracking follower reads. metaFollowerReadsCount = metric.Metadata{ @@ -1873,13 +1880,14 @@ type StoreMetrics struct { Reserved *metric.Gauge // Rebalancing metrics. - AverageQueriesPerSecond *metric.GaugeFloat64 - AverageWritesPerSecond *metric.GaugeFloat64 - AverageReadsPerSecond *metric.GaugeFloat64 - AverageRequestsPerSecond *metric.GaugeFloat64 - AverageWriteBytesPerSecond *metric.GaugeFloat64 - AverageReadBytesPerSecond *metric.GaugeFloat64 - AverageCPUNanosPerSecond *metric.GaugeFloat64 + AverageQueriesPerSecond *metric.GaugeFloat64 + AverageWritesPerSecond *metric.GaugeFloat64 + AverageReadsPerSecond *metric.GaugeFloat64 + AverageRequestsPerSecond *metric.GaugeFloat64 + AverageWriteBytesPerSecond *metric.GaugeFloat64 + AverageReadBytesPerSecond *metric.GaugeFloat64 + AverageCPUNanosPerSecond *metric.GaugeFloat64 + RecentReplicaCPUNanosPerSecond *metric.ManualWindowHistogram // l0SublevelsWindowedMax doesn't get recorded to metrics itself, it maintains // an ad-hoc history for gosipping information for allocator use. l0SublevelsWindowedMax syncutil.AtomicFloat64 @@ -2462,6 +2470,11 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { AverageWriteBytesPerSecond: metric.NewGaugeFloat64(metaAverageWriteBytesPerSecond), AverageReadBytesPerSecond: metric.NewGaugeFloat64(metaAverageReadBytesPerSecond), AverageCPUNanosPerSecond: metric.NewGaugeFloat64(metaAverageCPUNanosPerSecond), + RecentReplicaCPUNanosPerSecond: metric.NewManualWindowHistogram( + metaRecentReplicaCPUNanosPerSecond, + metric.ReplicaCPUTimeBuckets, + true, /* withRotate */ + ), // Follower reads metrics. FollowerReadsCount: metric.NewCounter(metaFollowerReadsCount), diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 85abac4dcf1c..dc73eea50479 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2986,7 +2986,9 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { averageReadsPerSecond += loadStats.ReadKeysPerSecond averageReadBytesPerSecond += loadStats.ReadBytesPerSecond averageWriteBytesPerSecond += loadStats.WriteBytesPerSecond - averageCPUNanosPerSecond += loadStats.RaftCPUNanosPerSecond + loadStats.RequestCPUNanosPerSecond + replicaCPUNanosPerSecond := loadStats.RaftCPUNanosPerSecond + loadStats.RequestCPUNanosPerSecond + averageCPUNanosPerSecond += replicaCPUNanosPerSecond + s.metrics.RecentReplicaCPUNanosPerSecond.RecordValue(replicaCPUNanosPerSecond) locks += metrics.LockTableMetrics.Locks totalLockHoldDurationNanos += metrics.LockTableMetrics.TotalLockHoldDurationNanos @@ -3058,6 +3060,10 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { s.metrics.ClosedTimestampMaxBehindNanos.Update(nanos) } + if err := s.metrics.RecentReplicaCPUNanosPerSecond.Rotate(); err != nil { + return err + } + return nil } diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index 0387c2f536dd..a6fdb7f53a1b 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -732,6 +732,10 @@ var charts = []sectionDescription{ Title: "CPU Nanos Used Per Second", Metrics: []string{"rebalancing.cpunanospersecond"}, }, + { + Title: "Replica CPU Nanos Used Per Second", + Metrics: []string{"rebalancing.replicas.cpunanospersecond"}, + }, }, }, { diff --git a/pkg/util/metric/histogram_buckets.go b/pkg/util/metric/histogram_buckets.go index bf9f70579afb..b639e66af3a8 100644 --- a/pkg/util/metric/histogram_buckets.go +++ b/pkg/util/metric/histogram_buckets.go @@ -356,3 +356,26 @@ var MemoryUsage64MBBuckets = []float64{ 17725385.537954, // 18 MB 64000000.000000, // 64 MB } + +var ReplicaCPUTimeBuckets = []float64{ + 500000.000000, // 500µs + 811888.369594, // 811.888µs + 1318325.449365, // 1.318325ms + 2140666.199360, // 2.140666ms + 3475963.980888, // 3.475963ms + 5644189.458423, // 5.644189ms + 9164903.554162, // 9.164903ms + 14881757.208157, // 14.881757ms + 24164651.192859, // 24.164651ms + 39237998.517573, // 39.237998ms + 63713749.285157, // 63.713749ms + 103456904.055739, // 103.456904ms + 167990914.314189, // 167.990914ms + 272779739.058426, // 272.779739ms + 442933395.205041, // 442.933395ms + 719224944.143830, // 719.224944ms + 1167860734.545059, // 1.167860734s + 1896345095.366121, // 1.896345095s + 3079241055.330125, // 3.079241055s + 4999999999.999990, // 4.999999999s +} diff --git a/pkg/util/metric/histogram_buckets_test.go b/pkg/util/metric/histogram_buckets_test.go index 6f28454b89ff..6b6e9dcf4ecb 100644 --- a/pkg/util/metric/histogram_buckets_test.go +++ b/pkg/util/metric/histogram_buckets_test.go @@ -86,4 +86,10 @@ func TestHistogramBuckets(t *testing.T) { exp := prometheus.ExponentialBucketsRange(1, 64e6, 15) verifyAndPrint(t, exp, MemoryUsage64MBBuckets, SIZE) }) + + t.Run("ReplicaCPUTimeBuckets", func(t *testing.T) { + exp := prometheus.ExponentialBucketsRange(50e4 /* 50µs */, 5e9 /* 5s */, 20) + verifyAndPrint(t, exp, ReplicaCPUTimeBuckets, LATENCY) + }) + } From 2d43cb3f9d5b395efaaf21769abe33e7be75d539 Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 13 Mar 2023 13:32:24 +0000 Subject: [PATCH 09/12] kvserver: correct qps metric measurement The `rebalancing.queriespersecond` metric incorrectly used `Keys/Sec` as a measurement. This commit updates the measurement to be `Queries/Sec`, as implied by the name. Release note: None --- pkg/kv/kvserver/metrics.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index da7a4ef35161..040d8058333a 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -327,7 +327,7 @@ var ( metaAverageQueriesPerSecond = metric.Metadata{ Name: "rebalancing.queriespersecond", Help: "Number of kv-level requests received per second by the store, considering the last 30 minutes, as used in rebalancing decisions.", - Measurement: "Keys/Sec", + Measurement: "Queries/Sec", Unit: metric.Unit_COUNT, } metaAverageWritesPerSecond = metric.Metadata{ From 7b6c751d848dacc9fb71d33b5a7d8e243a364c5d Mon Sep 17 00:00:00 2001 From: Austen McClernon Date: Mon, 13 Mar 2023 13:56:28 +0000 Subject: [PATCH 10/12] kvserver: add replica qps histogram metric This commit introduces a histogram tracking percentiles of replica QPS time. Previously, there was only point in time insight into replica QPS distribution via hot ranges. This change enables historical timeseries tracking via the metric `rebalancing.replicas.queriespersecond`. Part of: #98255 Release note (ops change): The `rebalancing.replicas.queriespersecond` histogram metric is added, which provides insight into the distribution of queries per replica within a store. --- pkg/kv/kvserver/metrics.go | 31 ++++++++++++++++++----- pkg/kv/kvserver/store.go | 4 +++ pkg/ts/catalog/chart_catalog.go | 4 +++ pkg/util/metric/histogram_buckets.go | 29 +++++++++++++++++++++ pkg/util/metric/histogram_buckets_test.go | 5 ++++ 5 files changed, 66 insertions(+), 7 deletions(-) diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 040d8058333a..ce180148f8ee 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -373,6 +373,13 @@ var ( Measurement: "Nanoseconds/Sec", Unit: metric.Unit_NANOSECONDS, } + metaRecentReplicaQueriesPerSecond = metric.Metadata{ + Name: "rebalancing.replicas.queriespersecond", + Help: "Histogram of average kv-level requests received per second by " + + "replicas on the store in the last 30 minutes.", + Measurement: "Queries/Sec", + Unit: metric.Unit_COUNT, + } // Metric for tracking follower reads. metaFollowerReadsCount = metric.Metadata{ @@ -1880,14 +1887,19 @@ type StoreMetrics struct { Reserved *metric.Gauge // Rebalancing metrics. - AverageQueriesPerSecond *metric.GaugeFloat64 - AverageWritesPerSecond *metric.GaugeFloat64 - AverageReadsPerSecond *metric.GaugeFloat64 - AverageRequestsPerSecond *metric.GaugeFloat64 - AverageWriteBytesPerSecond *metric.GaugeFloat64 - AverageReadBytesPerSecond *metric.GaugeFloat64 - AverageCPUNanosPerSecond *metric.GaugeFloat64 + AverageQueriesPerSecond *metric.GaugeFloat64 + AverageWritesPerSecond *metric.GaugeFloat64 + AverageReadsPerSecond *metric.GaugeFloat64 + AverageRequestsPerSecond *metric.GaugeFloat64 + AverageWriteBytesPerSecond *metric.GaugeFloat64 + AverageReadBytesPerSecond *metric.GaugeFloat64 + AverageCPUNanosPerSecond *metric.GaugeFloat64 + // NB: Even though we could average the histogram in order to get + // AverageCPUNanosPerSecond from RecentReplicaCPUNanosPerSecond, we duplicate + // both for backwards compatibility since the cost of the gauge is small. + // This includes all replicas, including quiesced ones. RecentReplicaCPUNanosPerSecond *metric.ManualWindowHistogram + RecentReplicaQueriesPerSecond *metric.ManualWindowHistogram // l0SublevelsWindowedMax doesn't get recorded to metrics itself, it maintains // an ad-hoc history for gosipping information for allocator use. l0SublevelsWindowedMax syncutil.AtomicFloat64 @@ -2475,6 +2487,11 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { metric.ReplicaCPUTimeBuckets, true, /* withRotate */ ), + RecentReplicaQueriesPerSecond: metric.NewManualWindowHistogram( + metaRecentReplicaQueriesPerSecond, + metric.ReplicaBatchRequestCountBuckets, + true, /* withRotate */ + ), // Follower reads metrics. FollowerReadsCount: metric.NewCounter(metaFollowerReadsCount), diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index dc73eea50479..1403d36c3499 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -2989,6 +2989,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { replicaCPUNanosPerSecond := loadStats.RaftCPUNanosPerSecond + loadStats.RequestCPUNanosPerSecond averageCPUNanosPerSecond += replicaCPUNanosPerSecond s.metrics.RecentReplicaCPUNanosPerSecond.RecordValue(replicaCPUNanosPerSecond) + s.metrics.RecentReplicaQueriesPerSecond.RecordValue(loadStats.QueriesPerSecond) locks += metrics.LockTableMetrics.Locks totalLockHoldDurationNanos += metrics.LockTableMetrics.TotalLockHoldDurationNanos @@ -3063,6 +3064,9 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { if err := s.metrics.RecentReplicaCPUNanosPerSecond.Rotate(); err != nil { return err } + if err := s.metrics.RecentReplicaQueriesPerSecond.Rotate(); err != nil { + return err + } return nil } diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index a6fdb7f53a1b..fead6330d32d 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -736,6 +736,10 @@ var charts = []sectionDescription{ Title: "Replica CPU Nanos Used Per Second", Metrics: []string{"rebalancing.replicas.cpunanospersecond"}, }, + { + Title: "Replica Queries Per Second", + Metrics: []string{"rebalancing.replicas.queriespersecond"}, + }, }, }, { diff --git a/pkg/util/metric/histogram_buckets.go b/pkg/util/metric/histogram_buckets.go index b639e66af3a8..d83f9d3ac065 100644 --- a/pkg/util/metric/histogram_buckets.go +++ b/pkg/util/metric/histogram_buckets.go @@ -379,3 +379,32 @@ var ReplicaCPUTimeBuckets = []float64{ 3079241055.330125, // 3.079241055s 4999999999.999990, // 4.999999999s } + +// ReplicaBatchRequestCountBuckets are prometheus histogram buckets suitable +// for a histogram that records request counts to a replica. NOTE: The default +// load based split threshold is 2500 Requests (>= BatchRequests) when QPS +// splitting is enabled. We don't expect more than 2500 batch requests for a +// replica in most clusters. However with CPU splits (default), this no longer +// holds. +var ReplicaBatchRequestCountBuckets = []float64{ + 1.000000, + 1.664445, + 2.770377, + 4.611141, + 7.674991, + 12.774602, + 21.262623, + 35.390468, + 58.905491, + 98.044956, + 163.190445, + 271.621536, + 452.099132, + 752.494181, + 1252.485246, + 2084.692921, + 3469.856899, + 5775.386284, + 9612.813352, + 16000.000000, +} diff --git a/pkg/util/metric/histogram_buckets_test.go b/pkg/util/metric/histogram_buckets_test.go index 6b6e9dcf4ecb..9bde8336d575 100644 --- a/pkg/util/metric/histogram_buckets_test.go +++ b/pkg/util/metric/histogram_buckets_test.go @@ -92,4 +92,9 @@ func TestHistogramBuckets(t *testing.T) { verifyAndPrint(t, exp, ReplicaCPUTimeBuckets, LATENCY) }) + t.Run("ReplicaBatchRequestCountBuckets", func(t *testing.T) { + exp := prometheus.ExponentialBucketsRange(1, 16e3, 20) + verifyAndPrint(t, exp, ReplicaBatchRequestCountBuckets, "") + }) + } From 52ef4d394bde83085a0a4eda0d985808d848862d Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Thu, 30 Mar 2023 19:33:01 -0400 Subject: [PATCH 11/12] streamingccl: clean up stream ingestion job execution This patch cleans up the stream ingestion job code by: 1. Seperating dist sql processor planning and execution in seperate functions. 2. Setting up the processor planning code for a future with job replanning. 3. Seperating stream ingestion and completion into seperat functions. 3. Cleaning up a few log lines and error messages. Epic: none Release note: none --- pkg/ccl/streamingccl/streamingest/BUILD.bazel | 2 +- .../streamingest/stream_ingestion_dist.go | 262 ++++++++++++++++++ .../streamingest/stream_ingestion_job.go | 180 ++++-------- .../stream_ingestion_processor_planning.go | 165 ----------- 4 files changed, 319 insertions(+), 290 deletions(-) create mode 100644 pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go delete mode 100644 pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go diff --git a/pkg/ccl/streamingccl/streamingest/BUILD.bazel b/pkg/ccl/streamingccl/streamingest/BUILD.bazel index ccb2b31e5ca4..e87a2088245d 100644 --- a/pkg/ccl/streamingccl/streamingest/BUILD.bazel +++ b/pkg/ccl/streamingccl/streamingest/BUILD.bazel @@ -8,11 +8,11 @@ go_library( "external_connection.go", "metrics.go", "stream_ingest_manager.go", + "stream_ingestion_dist.go", "stream_ingestion_frontier_processor.go", "stream_ingestion_job.go", "stream_ingestion_planning.go", "stream_ingestion_processor.go", - "stream_ingestion_processor_planning.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamingest", visibility = ["//visibility:public"], diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go new file mode 100644 index 000000000000..7d74c2ad46eb --- /dev/null +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_dist.go @@ -0,0 +1,262 @@ +// Copyright 2020 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package streamingest + +import ( + "context" + "fmt" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" + "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" + "github.com/cockroachdb/cockroach/pkg/jobs" + "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" + "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/repstream/streampb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/isql" + "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/cockroachdb/logtags" +) + +func startDistIngestion( + ctx context.Context, + execCtx sql.JobExecContext, + ingestionJob *jobs.Job, + client streamclient.Client, +) error { + + details := ingestionJob.Details().(jobspb.StreamIngestionDetails) + progress := ingestionJob.Progress() + + var previousHighWater, heartbeatTimestamp hlc.Timestamp + initialScanTimestamp := details.ReplicationStartTime + // Start from the last checkpoint if it exists. + if h := progress.GetHighWater(); h != nil && !h.IsEmpty() { + previousHighWater = *h + heartbeatTimestamp = previousHighWater + } else { + heartbeatTimestamp = initialScanTimestamp + } + + log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s", + ingestionJob.ID(), heartbeatTimestamp) + + streamID := streampb.StreamID(details.StreamID) + updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.InitializingReplication, + fmt.Sprintf("connecting to the producer job %d and resuming a stream replication plan", streamID)) + if err := waitUntilProducerActive(ctx, client, streamID, heartbeatTimestamp, ingestionJob.ID()); err != nil { + return err + } + + log.Infof(ctx, "producer job %d is active, creating a stream replication plan", streamID) + dsp := execCtx.DistSQLPlanner() + + p, planCtx, err := makePlan( + execCtx, + ingestionJob, + details, + client, + previousHighWater, + progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest.Checkpoint, + initialScanTimestamp)(ctx, dsp) + if err != nil { + return err + } + log.Infof(ctx, "starting to run DistSQL flow for stream ingestion job %d", + ingestionJob.ID()) + + execPlan := func(ctx context.Context) error { + ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil) + + rw := sql.NewRowResultWriter(nil /* rowContainer */) + + var noTxn *kv.Txn + recv := sql.MakeDistSQLReceiver( + ctx, + rw, + tree.Rows, + nil, /* rangeCache */ + noTxn, + nil, /* clockUpdater */ + execCtx.ExtendedEvalContext().Tracing, + ) + defer recv.Release() + + jobsprofiler.StorePlanDiagram(ctx, execCtx.ExecCfg().DistSQLSrv.Stopper, p, execCtx.ExecCfg().InternalDB, + ingestionJob.ID()) + + // Copy the evalCtx, as dsp.Run() might change it. + evalCtxCopy := *execCtx.ExtendedEvalContext() + dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */) + return rw.Err() + } + + updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.Replicating, + "running the SQL flow for the stream ingestion job") + + // TODO(msbutler): Implement automatic replanning in the spirit of changefeed replanning. + return execPlan(ctx) +} + +// TODO (msbutler): this function signature was written to use in automatic job replanning via +// sql.PhysicalPlanChangeChecker(). Actually implement c2c replanning. +func makePlan( + execCtx sql.JobExecContext, + ingestionJob *jobs.Job, + details jobspb.StreamIngestionDetails, + client streamclient.Client, + previousHighWater hlc.Timestamp, + checkpoint jobspb.StreamIngestionCheckpoint, + initialScanTimestamp hlc.Timestamp, +) func(context.Context, *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) { + return func(ctx context.Context, dsp *sql.DistSQLPlanner) (*sql.PhysicalPlan, *sql.PlanningCtx, error) { + jobID := ingestionJob.ID() + log.Infof(ctx, "Re Planning DistSQL flow for stream ingestion job %d", jobID) + + streamID := streampb.StreamID(details.StreamID) + topology, err := client.Plan(ctx, streamID) + if err != nil { + return nil, nil, err + } + err = ingestionJob.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { + md.Progress.GetStreamIngest().StreamAddresses = topology.StreamAddresses() + ju.UpdateProgress(md.Progress) + return nil + }) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to update job progress") + } + + planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, execCtx.ExtendedEvalContext(), execCtx.ExecCfg()) + if err != nil { + return nil, nil, err + } + + streamIngestionSpecs, streamIngestionFrontierSpec, err := constructStreamIngestionPlanSpecs( + streamingccl.StreamAddress(details.StreamAddress), + topology, + sqlInstanceIDs, + initialScanTimestamp, + previousHighWater, + checkpoint, + jobID, + streamID, + topology.SourceTenantID, + details.DestinationTenantID) + if err != nil { + return nil, nil, err + } + if knobs := execCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.AfterReplicationFlowPlan != nil { + knobs.AfterReplicationFlowPlan(streamIngestionSpecs, streamIngestionFrontierSpec) + } + + // Setup a one-stage plan with one proc per input spec. + corePlacement := make([]physicalplan.ProcessorCorePlacement, len(streamIngestionSpecs)) + for i := range streamIngestionSpecs { + corePlacement[i].SQLInstanceID = sqlInstanceIDs[i] + corePlacement[i].Core.StreamIngestionData = streamIngestionSpecs[i] + } + + p := planCtx.NewPhysicalPlan() + p.AddNoInputStage( + corePlacement, + execinfrapb.PostProcessSpec{}, + streamIngestionResultTypes, + execinfrapb.Ordering{}, + ) + + gatewayNodeID, err := execCtx.ExecCfg().NodeInfo.NodeID.OptionalNodeIDErr(48274) + if err != nil { + return nil, nil, err + } + + // The ResultRouters from the previous stage will feed in to the + // StreamIngestionFrontier processor. + p.AddSingleGroupStage(ctx, base.SQLInstanceID(gatewayNodeID), + execinfrapb.ProcessorCoreUnion{StreamIngestionFrontier: streamIngestionFrontierSpec}, + execinfrapb.PostProcessSpec{}, streamIngestionResultTypes) + + p.PlanToStreamColMap = []int{0} + sql.FinalizePlan(ctx, planCtx, p) + return p, planCtx, nil + } +} + +func constructStreamIngestionPlanSpecs( + streamAddress streamingccl.StreamAddress, + topology streamclient.Topology, + sqlInstanceIDs []base.SQLInstanceID, + initialScanTimestamp hlc.Timestamp, + previousHighWater hlc.Timestamp, + checkpoint jobspb.StreamIngestionCheckpoint, + jobID jobspb.JobID, + streamID streampb.StreamID, + sourceTenantID roachpb.TenantID, + destinationTenantID roachpb.TenantID, +) ([]*execinfrapb.StreamIngestionDataSpec, *execinfrapb.StreamIngestionFrontierSpec, error) { + // For each stream partition in the topology, assign it to a node. + streamIngestionSpecs := make([]*execinfrapb.StreamIngestionDataSpec, 0, len(sqlInstanceIDs)) + + trackedSpans := make([]roachpb.Span, 0) + subscribingSQLInstances := make(map[string]uint32) + for i, partition := range topology.Partitions { + // Round robin assign the stream partitions to nodes. Partitions 0 through + // len(nodes) - 1 creates the spec. Future partitions just add themselves to + // the partition addresses. + if i < len(sqlInstanceIDs) { + spec := &execinfrapb.StreamIngestionDataSpec{ + StreamID: uint64(streamID), + JobID: int64(jobID), + PreviousHighWaterTimestamp: previousHighWater, + InitialScanTimestamp: initialScanTimestamp, + Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info + StreamAddress: string(streamAddress), + PartitionSpecs: make(map[string]execinfrapb.StreamIngestionPartitionSpec), + TenantRekey: execinfrapb.TenantRekey{ + OldID: sourceTenantID, + NewID: destinationTenantID, + }, + } + streamIngestionSpecs = append(streamIngestionSpecs, spec) + } + n := i % len(sqlInstanceIDs) + + subscribingSQLInstances[partition.ID] = uint32(sqlInstanceIDs[n]) + streamIngestionSpecs[n].PartitionSpecs[partition.ID] = execinfrapb.StreamIngestionPartitionSpec{ + PartitionID: partition.ID, + SubscriptionToken: string(partition.SubscriptionToken), + Address: string(partition.SrcAddr), + Spans: partition.Spans, + } + + trackedSpans = append(trackedSpans, partition.Spans...) + } + + // Create a spec for the StreamIngestionFrontier processor on the coordinator + // node. + streamIngestionFrontierSpec := &execinfrapb.StreamIngestionFrontierSpec{ + HighWaterAtStart: previousHighWater, + TrackedSpans: trackedSpans, + JobID: int64(jobID), + StreamID: uint64(streamID), + StreamAddresses: topology.StreamAddresses(), + SubscribingSQLInstances: subscribingSQLInstances, + Checkpoint: checkpoint, + } + + return streamIngestionSpecs, streamIngestionFrontierSpec, nil +} diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go index 216428931d76..628166614045 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go @@ -189,6 +189,51 @@ func updateRunningStatusInternal( ju.UpdateProgress(md.Progress) } +func completeIngestion( + ctx context.Context, + execCtx sql.JobExecContext, + ingestionJob *jobs.Job, + client streamclient.Client, +) error { + log.Infof(ctx, + "reverting to the specified cutover timestamp for stream ingestion job %d", + ingestionJob.ID()) + if err := revertToCutoverTimestamp(ctx, execCtx, ingestionJob); err != nil { + return err + } + details := ingestionJob.Details().(jobspb.StreamIngestionDetails) + log.Infof(ctx, "activating destination tenant %d", details.DestinationTenantID) + if err := activateTenant(ctx, execCtx, details.DestinationTenantID); err != nil { + return err + } + + streamID := details.StreamID + log.Infof(ctx, "completing the producer job %d", streamID) + updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.ReplicationCuttingOver, + "completing the producer job in the source cluster") + // Completes the producer job in the source cluster on best effort. In a real + // disaster recovery scenario, who knows what state the source cluster will be + // in; thus, we should not fail the cutover step on the consumer side if we + // cannot complete the producer job. + if err := client.Complete(ctx, streampb.StreamID(streamID), true /* successfulIngestion */); err != nil { + log.Warningf(ctx, "encountered error when completing the source cluster producer job %d: %s", streamID, err.Error()) + } + // Now that we have completed the cutover we can release the protected + // timestamp record on the destination tenant's keyspace. + if details.ProtectedTimestampRecordID != nil { + if err := execCtx.ExecCfg().InternalDB.Txn(ctx, func( + ctx context.Context, txn isql.Txn, + ) error { + ptp := execCtx.ExecCfg().ProtectedTimestampProvider.WithTxn(txn) + return releaseDestinationTenantProtectedTimestamp( + ctx, ptp, *details.ProtectedTimestampRecordID, + ) + }); err != nil { + return err + } + } + return nil +} func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs.Job) error { // Cutover should be the *first* thing checked upon resumption as it is the // most critical task in disaster recovery. @@ -200,141 +245,28 @@ func ingest(ctx context.Context, execCtx sql.JobExecContext, ingestionJob *jobs. log.Infof(ctx, "job completed cutover on resume") return nil } - if knobs := execCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.BeforeIngestionStart != nil { if err := knobs.BeforeIngestionStart(ctx); err != nil { return err } } - - details := ingestionJob.Details().(jobspb.StreamIngestionDetails) - progress := ingestionJob.Progress() - streamAddress := streamingccl.StreamAddress(details.StreamAddress) - - var previousHighWater, heartbeatTimestamp hlc.Timestamp - initialScanTimestamp := details.ReplicationStartTime - // Start from the last checkpoint if it exists. - if h := progress.GetHighWater(); h != nil && !h.IsEmpty() { - previousHighWater = *h - heartbeatTimestamp = previousHighWater - } else { - heartbeatTimestamp = initialScanTimestamp - } - - // Initialize a stream client and resolve topology. client, err := connectToActiveClient(ctx, ingestionJob) if err != nil { return err } - ingestWithClient := func() error { - streamID := streampb.StreamID(details.StreamID) - updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.InitializingReplication, - fmt.Sprintf("connecting to the producer job %d and creating a stream replication plan", streamID)) - if err := waitUntilProducerActive(ctx, client, streamID, heartbeatTimestamp, ingestionJob.ID()); err != nil { - return err - } - - log.Infof(ctx, "producer job %d is active, creating a stream replication plan", streamID) - topology, err := client.Plan(ctx, streamID) - if err != nil { - return err - } - - // TODO(casper): update running status - err = ingestionJob.NoTxn().Update(ctx, func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error { - md.Progress.GetStreamIngest().StreamAddresses = topology.StreamAddresses() - ju.UpdateProgress(md.Progress) - return nil - }) - if err != nil { - return errors.Wrap(err, "failed to update job progress") + defer func() { + if err := client.Close(ctx); err != nil { + log.Warningf(ctx, "stream ingestion client did not shut down properly: %s", err.Error()) } - - if previousHighWater.IsEmpty() { - log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s", - ingestionJob.ID(), initialScanTimestamp) - } else { - log.Infof(ctx, "ingestion job %d resumes stream ingestion from start time %s", - ingestionJob.ID(), previousHighWater) - } - - ingestProgress := progress.Details.(*jobspb.Progress_StreamIngest).StreamIngest - checkpoint := ingestProgress.Checkpoint - - evalCtx := execCtx.ExtendedEvalContext() - dsp := execCtx.DistSQLPlanner() - - planCtx, sqlInstanceIDs, err := dsp.SetupAllNodesPlanning(ctx, evalCtx, execCtx.ExecCfg()) - - if err != nil { - return err - } - - // Construct stream ingestion processor specs. - streamIngestionSpecs, streamIngestionFrontierSpec, err := distStreamIngestionPlanSpecs( - streamAddress, topology, sqlInstanceIDs, initialScanTimestamp, previousHighWater, checkpoint, - ingestionJob.ID(), streamID, topology.SourceTenantID, details.DestinationTenantID) - if err != nil { - return err - } - - if knobs := execCtx.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.AfterReplicationFlowPlan != nil { - knobs.AfterReplicationFlowPlan(streamIngestionSpecs, streamIngestionFrontierSpec) - } - - // Plan and run the DistSQL flow. - log.Infof(ctx, "starting to run DistSQL flow for stream ingestion job %d", - ingestionJob.ID()) - updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.Replicating, - "running the SQL flow for the stream ingestion job") - if err = distStreamIngest(ctx, execCtx, sqlInstanceIDs, planCtx, dsp, - streamIngestionSpecs, streamIngestionFrontierSpec); err != nil { - return err - } - - // A nil error is only possible if the job was signaled to cutover and the - // processors shut down gracefully, i.e stopped ingesting any additional - // events from the replication stream. At this point it is safe to revert to - // the cutoff time to leave the cluster in a consistent state. - log.Infof(ctx, - "starting to revert to the specified cutover timestamp for stream ingestion job %d", - ingestionJob.ID()) - if err = revertToCutoverTimestamp(ctx, execCtx, ingestionJob); err != nil { - return err - } - - log.Infof(ctx, "activating destination tenant %d", details.DestinationTenantID) - // Activate the tenant as it is now in a usable state. - if err = activateTenant(ctx, execCtx, details.DestinationTenantID); err != nil { - return err - } - - log.Infof(ctx, "starting to complete the producer job %d", streamID) - updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.ReplicationCuttingOver, - "completing the producer job in the source cluster") - // Completes the producer job in the source cluster on best effort. - if err = client.Complete(ctx, streamID, true /* successfulIngestion */); err != nil { - log.Warningf(ctx, "encountered error when completing the source cluster producer job %d", streamID) - } - - // Now that we have completed the cutover we can release the protected - // timestamp record on the destination tenant's keyspace. - if details.ProtectedTimestampRecordID != nil { - if err := execCtx.ExecCfg().InternalDB.Txn(ctx, func( - ctx context.Context, txn isql.Txn, - ) error { - ptp := execCtx.ExecCfg().ProtectedTimestampProvider.WithTxn(txn) - return releaseDestinationTenantProtectedTimestamp( - ctx, ptp, *details.ProtectedTimestampRecordID, - ) - }); err != nil { - return err - } - } - - return nil + }() + if err = startDistIngestion(ctx, execCtx, ingestionJob, client); err != nil { + return err } - return errors.CombineErrors(ingestWithClient(), client.Close(ctx)) + // A nil error is only possible if the job was signaled to cutover and the + // processors shut down gracefully, i.e stopped ingesting any additional + // events from the replication stream. At this point it is safe to revert to + // the cutoff time to leave the cluster in a consistent state. + return completeIngestion(ctx, execCtx, ingestionJob, client) } func ingestWithRetries( diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go deleted file mode 100644 index b27af599d2a3..000000000000 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_processor_planning.go +++ /dev/null @@ -1,165 +0,0 @@ -// Copyright 2020 The Cockroach Authors. -// -// Licensed as a CockroachDB Enterprise file under the Cockroach Community -// License (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt - -package streamingest - -import ( - "context" - - "github.com/cockroachdb/cockroach/pkg/base" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" - "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" - "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" - "github.com/cockroachdb/cockroach/pkg/jobs/jobsprofiler" - "github.com/cockroachdb/cockroach/pkg/kv" - "github.com/cockroachdb/cockroach/pkg/repstream/streampb" - "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/logtags" -) - -func distStreamIngestionPlanSpecs( - streamAddress streamingccl.StreamAddress, - topology streamclient.Topology, - sqlInstanceIDs []base.SQLInstanceID, - initialScanTimestamp hlc.Timestamp, - previousHighWater hlc.Timestamp, - checkpoint jobspb.StreamIngestionCheckpoint, - jobID jobspb.JobID, - streamID streampb.StreamID, - sourceTenantID roachpb.TenantID, - destinationTenantID roachpb.TenantID, -) ([]*execinfrapb.StreamIngestionDataSpec, *execinfrapb.StreamIngestionFrontierSpec, error) { - // For each stream partition in the topology, assign it to a node. - streamIngestionSpecs := make([]*execinfrapb.StreamIngestionDataSpec, 0, len(sqlInstanceIDs)) - - trackedSpans := make([]roachpb.Span, 0) - subscribingSQLInstances := make(map[string]uint32) - for i, partition := range topology.Partitions { - // Round robin assign the stream partitions to nodes. Partitions 0 through - // len(nodes) - 1 creates the spec. Future partitions just add themselves to - // the partition addresses. - if i < len(sqlInstanceIDs) { - spec := &execinfrapb.StreamIngestionDataSpec{ - StreamID: uint64(streamID), - JobID: int64(jobID), - PreviousHighWaterTimestamp: previousHighWater, - InitialScanTimestamp: initialScanTimestamp, - Checkpoint: checkpoint, // TODO: Only forward relevant checkpoint info - StreamAddress: string(streamAddress), - PartitionSpecs: make(map[string]execinfrapb.StreamIngestionPartitionSpec), - TenantRekey: execinfrapb.TenantRekey{ - OldID: sourceTenantID, - NewID: destinationTenantID, - }, - } - streamIngestionSpecs = append(streamIngestionSpecs, spec) - } - n := i % len(sqlInstanceIDs) - - subscribingSQLInstances[partition.ID] = uint32(sqlInstanceIDs[n]) - streamIngestionSpecs[n].PartitionSpecs[partition.ID] = execinfrapb.StreamIngestionPartitionSpec{ - PartitionID: partition.ID, - SubscriptionToken: string(partition.SubscriptionToken), - Address: string(partition.SrcAddr), - Spans: partition.Spans, - } - - trackedSpans = append(trackedSpans, partition.Spans...) - } - - // Create a spec for the StreamIngestionFrontier processor on the coordinator - // node. - streamIngestionFrontierSpec := &execinfrapb.StreamIngestionFrontierSpec{ - HighWaterAtStart: previousHighWater, - TrackedSpans: trackedSpans, - JobID: int64(jobID), - StreamID: uint64(streamID), - StreamAddresses: topology.StreamAddresses(), - SubscribingSQLInstances: subscribingSQLInstances, - Checkpoint: checkpoint, - } - - return streamIngestionSpecs, streamIngestionFrontierSpec, nil -} - -func distStreamIngest( - ctx context.Context, - execCtx sql.JobExecContext, - sqlInstanceIDs []base.SQLInstanceID, - planCtx *sql.PlanningCtx, - dsp *sql.DistSQLPlanner, - streamIngestionSpecs []*execinfrapb.StreamIngestionDataSpec, - streamIngestionFrontierSpec *execinfrapb.StreamIngestionFrontierSpec, -) error { - ctx = logtags.AddTag(ctx, "stream-ingest-distsql", nil) - evalCtx := execCtx.ExtendedEvalContext() - var noTxn *kv.Txn - - if len(streamIngestionSpecs) == 0 { - return nil - } - - // Setup a one-stage plan with one proc per input spec. - corePlacement := make([]physicalplan.ProcessorCorePlacement, len(streamIngestionSpecs)) - var jobID jobspb.JobID - for i := range streamIngestionSpecs { - if i == 0 { - jobID = jobspb.JobID(streamIngestionSpecs[i].JobID) - } - corePlacement[i].SQLInstanceID = sqlInstanceIDs[i] - corePlacement[i].Core.StreamIngestionData = streamIngestionSpecs[i] - } - - p := planCtx.NewPhysicalPlan() - p.AddNoInputStage( - corePlacement, - execinfrapb.PostProcessSpec{}, - streamIngestionResultTypes, - execinfrapb.Ordering{}, - ) - - execCfg := execCtx.ExecCfg() - gatewayNodeID, err := execCfg.NodeInfo.NodeID.OptionalNodeIDErr(48274) - if err != nil { - return err - } - - // The ResultRouters from the previous stage will feed in to the - // StreamIngestionFrontier processor. - p.AddSingleGroupStage(ctx, base.SQLInstanceID(gatewayNodeID), - execinfrapb.ProcessorCoreUnion{StreamIngestionFrontier: streamIngestionFrontierSpec}, - execinfrapb.PostProcessSpec{}, streamIngestionResultTypes) - - p.PlanToStreamColMap = []int{0} - sql.FinalizePlan(ctx, planCtx, p) - - rw := sql.NewRowResultWriter(nil /* rowContainer */) - - recv := sql.MakeDistSQLReceiver( - ctx, - rw, - tree.Rows, - nil, /* rangeCache */ - noTxn, - nil, /* clockUpdater */ - evalCtx.Tracing, - ) - defer recv.Release() - - jobsprofiler.StorePlanDiagram(ctx, execCfg.DistSQLSrv.Stopper, p, execCfg.InternalDB, jobID) - - // Copy the evalCtx, as dsp.Run() might change it. - evalCtxCopy := *evalCtx - dsp.Run(ctx, planCtx, noTxn, p, recv, &evalCtxCopy, nil /* finishedSetupFn */) - return rw.Err() -} From c7eb1bfbf667e88945080bcf294a5743f9d2e773 Mon Sep 17 00:00:00 2001 From: maryliag Date: Sun, 2 Apr 2023 19:19:03 -0400 Subject: [PATCH 12/12] server, sql: show in-memory data when no data is persisted Fixes #100439 When no data is persisted to sql stats tables (because no flush happened yet or because the flush is disabled), the endpoints should fall back to the combined view that contains the in-memory data. Release note (sql change): When there is no data persisted, show the in-memory data. --- pkg/server/combined_statement_stats.go | 257 +++++++++++++----- pkg/server/status_test.go | 13 - pkg/sql/internal.go | 4 + pkg/sql/isql/isql_db.go | 3 + .../src/statementsPage/statementsPage.tsx | 2 +- .../src/transactionsPage/transactionsPage.tsx | 2 +- 6 files changed, 191 insertions(+), 90 deletions(-) diff --git a/pkg/server/combined_statement_stats.go b/pkg/server/combined_statement_stats.go index 423825157766..b2ad3ba35a48 100644 --- a/pkg/server/combined_statement_stats.go +++ b/pkg/server/combined_statement_stats.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/appstatspb" + "github.com/cockroachdb/cockroach/pkg/sql/isql" "github.com/cockroachdb/cockroach/pkg/sql/sem/catconstants" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" @@ -39,6 +40,16 @@ func getTimeFromSeconds(seconds int64) *time.Time { return nil } +func closeIterator(it isql.Rows, err error) error { + if it != nil { + closeErr := it.Close() + if closeErr != nil { + err = errors.CombineErrors(err, closeErr) + } + } + return err +} + func (s *statusServer) CombinedStatementStats( ctx context.Context, req *serverpb.CombinedStatementsStatsRequest, ) (*serverpb.StatementsResponse, error) { @@ -146,7 +157,7 @@ COALESCE( (statistics-> 'statistics' ->> 'cnt')::FLOAT ) , 0) -FROM crdb_internal.%s_statistics_persisted +FROM crdb_internal.%s_statistics%s %s ` @@ -156,32 +167,58 @@ FROM crdb_internal.%s_statistics_persisted fmt.Sprintf(`%s-total-runtime`, table), nil, sessiondata.NodeUserSessionDataOverride, - fmt.Sprintf(queryWithPlaceholders, table, whereClause), + fmt.Sprintf(queryWithPlaceholders, table, `_persisted`, whereClause), args...) - if err != nil { - return 0, err - } - defer func() { - closeErr := it.Close() - if closeErr != nil { - err = errors.CombineErrors(err, closeErr) - } + err = closeIterator(it, err) }() + if err != nil { + return 0, err + } ok, err := it.Next(ctx) if err != nil { return 0, err } - if !ok { - return 0, errors.New("expected one row but got none") + return 0, errors.New("expected one row but got none on getTotalRuntimeSecs") } var row tree.Datums if row = it.Cur(); row == nil { - return 0, errors.New("unexpected null row") + return 0, errors.New("unexpected null row on getTotalRuntimeSecs") + } + + // If the total runtime is 0 there were no results from the persisted table, + // so we retrieve the data from the combined view with data in-memory. + if tree.MustBeDFloat(row[0]) == 0 { + err := closeIterator(it, err) + if err != nil { + return 0, err + } + it, err = ie.QueryIteratorEx( + ctx, + fmt.Sprintf(`%s-total-runtime-with-memory`, table), + nil, + sessiondata.NodeUserSessionDataOverride, + fmt.Sprintf(queryWithPlaceholders, table, ``, whereClause), + args...) + + if err != nil { + return 0, err + } + ok, err = it.Next(ctx) + if err != nil { + return 0, err + } + if !ok { + return 0, errors.New("expected one row but got none on getTotalRuntimeSecs") + } + + if row = it.Cur(); row == nil { + return 0, errors.New("unexpected null row on getTotalRuntimeSecs") + } } return float32(tree.MustBeDFloat(row[0])), nil @@ -356,7 +393,8 @@ func collectCombinedStatements( testingKnobs *sqlstats.TestingKnobs, ) ([]serverpb.StatementsResponse_CollectedStatementStatistics, error) { aostClause := testingKnobs.GetAOSTClause() - query := fmt.Sprintf(` + const expectedNumDatums = 6 + queryFormat := ` SELECT * FROM ( SELECT fingerprint_id, @@ -365,41 +403,60 @@ SELECT max(aggregated_ts) as aggregated_ts, metadata, crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics -FROM crdb_internal.statement_statistics_persisted %s +FROM %s %s GROUP BY fingerprint_id, transaction_fingerprint_id, app_name, metadata ) %s -%s`, whereClause, aostClause, orderAndLimit) - - const expectedNumDatums = 6 +%s` + query := fmt.Sprintf( + queryFormat, + `crdb_internal.statement_statistics_persisted`, + whereClause, + aostClause, + orderAndLimit) it, err := ie.QueryIteratorEx(ctx, "combined-stmts-by-interval", nil, sessiondata.NodeUserSessionDataOverride, query, args...) + defer func() { + err = closeIterator(it, err) + }() + if err != nil { return nil, serverError(ctx, err) } - defer func() { - closeErr := it.Close() - if closeErr != nil { - err = errors.CombineErrors(err, closeErr) + // If there are no results from the persisted table, retrieve the data from the combined view + // with data in-memory. + if !it.HasResults() { + err = closeIterator(it, err) + + query = fmt.Sprintf( + queryFormat, + `crdb_internal.statement_statistics`, + whereClause, + aostClause, + orderAndLimit) + it, err = ie.QueryIteratorEx(ctx, "combined-stmts-by-interval-with-memory", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return nil, serverError(ctx, err) } - }() + } var statements []serverpb.StatementsResponse_CollectedStatementStatistics var ok bool for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { var row tree.Datums if row = it.Cur(); row == nil { - return nil, errors.New("unexpected null row") + return nil, errors.New("unexpected null row on collectCombinedStatements") } if row.Len() != expectedNumDatums { - return nil, errors.Newf("expected %d columns, received %d", expectedNumDatums) + return nil, errors.Newf("expected %d columns on collectCombinedStatements, received %d", expectedNumDatums) } var statementFingerprintID uint64 @@ -459,8 +516,8 @@ func collectCombinedTransactions( testingKnobs *sqlstats.TestingKnobs, ) ([]serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics, error) { aostClause := testingKnobs.GetAOSTClause() - - query := fmt.Sprintf(` + const expectedNumDatums = 5 + queryFormat := ` SELECT * FROM ( SELECT app_name, @@ -468,40 +525,60 @@ SELECT fingerprint_id, metadata, crdb_internal.merge_transaction_stats(array_agg(statistics)) AS statistics -FROM crdb_internal.transaction_statistics_persisted %s +FROM %s %s GROUP BY app_name, fingerprint_id, metadata ) %s -%s`, whereClause, aostClause, orderAndLimit) +%s` - const expectedNumDatums = 5 + query := fmt.Sprintf( + queryFormat, + `crdb_internal.transaction_statistics_persisted`, + whereClause, + aostClause, + orderAndLimit) it, err := ie.QueryIteratorEx(ctx, "combined-txns-by-interval", nil, sessiondata.NodeUserSessionDataOverride, query, args...) + defer func() { + err = closeIterator(it, err) + }() + if err != nil { return nil, serverError(ctx, err) } - defer func() { - closeErr := it.Close() - if closeErr != nil { - err = errors.CombineErrors(err, closeErr) + // If there are no results from the persisted table, retrieve the data from the combined view + // with data in-memory. + if !it.HasResults() { + err = closeIterator(it, err) + + query = fmt.Sprintf( + queryFormat, + `crdb_internal.transaction_statistics`, + whereClause, + aostClause, + orderAndLimit) + it, err = ie.QueryIteratorEx(ctx, "combined-txn-by-interval-with-memory", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return nil, serverError(ctx, err) } - }() + } var transactions []serverpb.StatementsResponse_ExtendedCollectedTransactionStatistics var ok bool for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { var row tree.Datums if row = it.Cur(); row == nil { - return nil, errors.New("unexpected null row") + return nil, errors.New("unexpected null row on collectCombinedTransactions") } if row.Len() != expectedNumDatums { - return nil, errors.Newf("expected %d columns, received %d", expectedNumDatums, row.Len()) + return nil, errors.Newf("expected %d columns on collectCombinedTransactions, received %d", expectedNumDatums, row.Len()) } app := string(tree.MustBeDString(row[0])) @@ -684,31 +761,43 @@ func getStatementDetailsQueryClausesAndArgs( func getTotalStatementDetails( ctx context.Context, ie *sql.InternalExecutor, whereClause string, args []interface{}, ) (serverpb.StatementDetailsResponse_CollectedStatementSummary, error) { - query := fmt.Sprintf( - `SELECT + const expectedNumDatums = 4 + var statement serverpb.StatementDetailsResponse_CollectedStatementSummary + queryFormat := `SELECT crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, array_agg(app_name) as app_names, crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics, encode(fingerprint_id, 'hex') as fingerprint_id - FROM crdb_internal.statement_statistics_persisted %s + FROM %s %s GROUP BY fingerprint_id - LIMIT 1`, whereClause) - - const expectedNumDatums = 4 - var statement serverpb.StatementDetailsResponse_CollectedStatementSummary + LIMIT 1` + query := fmt.Sprintf(queryFormat, `crdb_internal.statement_statistics_persisted`, whereClause) row, err := ie.QueryRowEx(ctx, "combined-stmts-details-total", nil, sessiondata.NodeUserSessionDataOverride, query, args...) - if err != nil { return statement, serverError(ctx, err) } - if len(row) == 0 { + + // If there are no results from the persisted table, retrieve the data from the combined view + // with data in-memory. + if row.Len() == 0 { + query = fmt.Sprintf(queryFormat, `crdb_internal.statement_statistics`, whereClause) + row, err = ie.QueryRowEx(ctx, "combined-stmts-details-total-with-memory", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return statement, serverError(ctx, err) + } + } + + // If there are no results in-memory, return empty statement object. + if row.Len() == 0 { return statement, nil } if row.Len() != expectedNumDatums { - return statement, serverError(ctx, errors.Newf("expected %d columns, received %d", expectedNumDatums)) + return statement, serverError(ctx, errors.Newf( + "expected %d columns on getTotalStatementDetails, received %d", expectedNumDatums)) } var statistics appstatspb.CollectedStatementStatistics @@ -752,44 +841,52 @@ func getStatementDetailsPerAggregatedTs( args []interface{}, limit int64, ) ([]serverpb.StatementDetailsResponse_CollectedStatementGroupedByAggregatedTs, error) { - query := fmt.Sprintf( - `SELECT + const expectedNumDatums = 3 + queryFormat := `SELECT aggregated_ts, crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics - FROM crdb_internal.statement_statistics_persisted %s + FROM %s %s GROUP BY aggregated_ts ORDER BY aggregated_ts ASC - LIMIT $%d`, whereClause, len(args)+1) - + LIMIT $%d` + query := fmt.Sprintf(queryFormat, `crdb_internal.statement_statistics_persisted`, whereClause, len(args)+1) args = append(args, limit) - const expectedNumDatums = 3 it, err := ie.QueryIteratorEx(ctx, "combined-stmts-details-by-aggregated-timestamp", nil, sessiondata.NodeUserSessionDataOverride, query, args...) + defer func() { + err = closeIterator(it, err) + }() + if err != nil { return nil, serverError(ctx, err) } - defer func() { - closeErr := it.Close() - if closeErr != nil { - err = errors.CombineErrors(err, closeErr) + // If there are no results from the persisted table, retrieve the data from the combined view + // with data in-memory. + if !it.HasResults() { + err = closeIterator(it, err) + query = fmt.Sprintf(queryFormat, `crdb_internal.statement_statistics`, whereClause, len(args)) + it, err = ie.QueryIteratorEx(ctx, "combined-stmts-details-by-aggregated-timestamp-with-memory", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return nil, serverError(ctx, err) } - }() + } var statements []serverpb.StatementDetailsResponse_CollectedStatementGroupedByAggregatedTs var ok bool for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { var row tree.Datums if row = it.Cur(); row == nil { - return nil, errors.New("unexpected null row") + return nil, errors.New("unexpected null row on getStatementDetailsPerAggregatedTs") } if row.Len() != expectedNumDatums { - return nil, errors.Newf("expected %d columns, received %d", expectedNumDatums) + return nil, errors.Newf("expected %d columns on getStatementDetailsPerAggregatedTs, received %d", expectedNumDatums) } aggregatedTs := tree.MustBeDTimestampTZ(row[0]).Time @@ -832,6 +929,10 @@ func getExplainPlanFromGist(ctx context.Context, ie *sql.InternalExecutor, planG it, err := ie.QueryIteratorEx(ctx, "combined-stmts-details-get-explain-plan", nil, sessiondata.NodeUserSessionDataOverride, query, args...) + defer func() { + err = closeIterator(it, err) + }() + if err != nil { return planError } @@ -892,49 +993,55 @@ func getStatementDetailsPerPlanHash( args []interface{}, limit int64, ) ([]serverpb.StatementDetailsResponse_CollectedStatementGroupedByPlanHash, error) { - - query := fmt.Sprintf( - `SELECT + expectedNumDatums := 5 + queryFormat := `SELECT plan_hash, (statistics -> 'statistics' -> 'planGists'->>0) as plan_gist, crdb_internal.merge_stats_metadata(array_agg(metadata)) AS metadata, crdb_internal.merge_statement_stats(array_agg(statistics)) AS statistics, index_recommendations - FROM crdb_internal.statement_statistics_persisted %s + FROM %s %s GROUP BY plan_hash, plan_gist, index_recommendations - LIMIT $%d`, whereClause, len(args)+1) - - expectedNumDatums := 5 - + LIMIT $%d` + query := fmt.Sprintf(queryFormat, `crdb_internal.statement_statistics_persisted`, whereClause, len(args)+1) args = append(args, limit) it, err := ie.QueryIteratorEx(ctx, "combined-stmts-details-by-plan-hash", nil, sessiondata.NodeUserSessionDataOverride, query, args...) + defer func() { + err = closeIterator(it, err) + }() + if err != nil { return nil, serverError(ctx, err) } - defer func() { - closeErr := it.Close() - if closeErr != nil { - err = errors.CombineErrors(err, closeErr) + // If there are no results from the persisted table, retrieve the data from the combined view + // with data in-memory. + if !it.HasResults() { + err = closeIterator(it, err) + query = fmt.Sprintf(queryFormat, `crdb_internal.statement_statistics`, whereClause, len(args)) + it, err = ie.QueryIteratorEx(ctx, "combined-stmts-details-by-plan-hash-with-memory", nil, + sessiondata.NodeUserSessionDataOverride, query, args...) + if err != nil { + return nil, serverError(ctx, err) } - }() + } var statements []serverpb.StatementDetailsResponse_CollectedStatementGroupedByPlanHash var ok bool for ok, err = it.Next(ctx); ok; ok, err = it.Next(ctx) { var row tree.Datums if row = it.Cur(); row == nil { - return nil, errors.New("unexpected null row") + return nil, errors.New("unexpected null row on getStatementDetailsPerPlanHash") } if row.Len() != expectedNumDatums { - return nil, errors.Newf("expected %d columns, received %d", expectedNumDatums) + return nil, errors.Newf("expected %d columns on getStatementDetailsPerPlanHash, received %d", expectedNumDatums) } var planHash uint64 diff --git a/pkg/server/status_test.go b/pkg/server/status_test.go index 672b420b2a35..8a505dcddd57 100644 --- a/pkg/server/status_test.go +++ b/pkg/server/status_test.go @@ -52,7 +52,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlstats" - "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats" "github.com/cockroachdb/cockroach/pkg/sql/tests" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" @@ -1646,9 +1645,6 @@ func TestStatusAPICombinedTransactions(t *testing.T) { } } - // Flush stats, as combinedstmts reads only from system. - thirdServer.SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) - // Hit query endpoint. var resp serverpb.StatementsResponse if err := getStatusJSONProto(firstServerProto, "combinedstmts", &resp); err != nil { @@ -2021,8 +2017,6 @@ func TestStatusAPICombinedStatements(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - ctx := context.Background() - // Aug 30 2021 19:50:00 GMT+0000 aggregatedTs := int64(1630353000) testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{ @@ -2061,8 +2055,6 @@ func TestStatusAPICombinedStatements(t *testing.T) { thirdServerSQL.Exec(t, stmt.stmt) } - testCluster.Server(2).SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) - var resp serverpb.StatementsResponse // Test that non-admin without VIEWACTIVITY privileges cannot access. err := getStatusJSONProtoWithAdminOption(firstServerProto, "combinedstmts", &resp, false) @@ -2192,8 +2184,6 @@ func TestStatusAPIStatementDetails(t *testing.T) { // The liveness session might expire before the stress race can finish. skip.UnderStressRace(t, "expensive tests") - ctx := context.Background() - // Aug 30 2021 19:50:00 GMT+0000 aggregatedTs := int64(1630353000) testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{ @@ -2252,9 +2242,6 @@ func TestStatusAPIStatementDetails(t *testing.T) { } testPath := func(path string, expected resultValues) { - // Need to flush since this EP reads only flushed data. - testCluster.Server(2).SQLServer().(*sql.Server).GetSQLStatsProvider().(*persistedsqlstats.PersistedSQLStats).Flush(ctx) - err := getStatusJSONProtoWithAdminOption(firstServerProto, path, &resp, false) require.NoError(t, err) require.Equal(t, int64(expected.totalCount), resp.Statement.Stats.Count) diff --git a/pkg/sql/internal.go b/pkg/sql/internal.go index 42d83c901d14..34baa0c55131 100644 --- a/pkg/sql/internal.go +++ b/pkg/sql/internal.go @@ -501,6 +501,10 @@ func (r *rowsIterator) Types() colinfo.ResultColumns { return r.resultCols } +func (r *rowsIterator) HasResults() bool { + return r.first.row != nil +} + // QueryBuffered executes the supplied SQL statement and returns the resulting // rows (meaning all of them are buffered at once). If no user has been // previously set through SetSessionData, the statement is executed as the root diff --git a/pkg/sql/isql/isql_db.go b/pkg/sql/isql/isql_db.go index 9fce96d06ad8..554938036b96 100644 --- a/pkg/sql/isql/isql_db.go +++ b/pkg/sql/isql/isql_db.go @@ -244,4 +244,7 @@ type Rows interface { // WARNING: this method is safe to call anytime *after* the first call to // Next() (including after Close() was called). Types() colinfo.ResultColumns + + // HasResults returns true if there are results to the query, false otherwise. + HasResults() bool } diff --git a/pkg/ui/workspaces/cluster-ui/src/statementsPage/statementsPage.tsx b/pkg/ui/workspaces/cluster-ui/src/statementsPage/statementsPage.tsx index 7c23dda95fe5..f408db14884d 100644 --- a/pkg/ui/workspaces/cluster-ui/src/statementsPage/statementsPage.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/statementsPage/statementsPage.tsx @@ -280,7 +280,7 @@ export class StatementsPage extends React.Component< isSortSettingSameAsReqSort = (): boolean => { return ( - getSortColumn(this.state.reqSortSetting) == + getSortColumn(this.props.reqSortSetting) == this.props.sortSetting.columnTitle ); }; diff --git a/pkg/ui/workspaces/cluster-ui/src/transactionsPage/transactionsPage.tsx b/pkg/ui/workspaces/cluster-ui/src/transactionsPage/transactionsPage.tsx index 070c3c6038b9..872fbb7b9274 100644 --- a/pkg/ui/workspaces/cluster-ui/src/transactionsPage/transactionsPage.tsx +++ b/pkg/ui/workspaces/cluster-ui/src/transactionsPage/transactionsPage.tsx @@ -294,7 +294,7 @@ export class TransactionsPage extends React.Component< isSortSettingSameAsReqSort = (): boolean => { return ( - getSortColumn(this.state.reqSortSetting) == + getSortColumn(this.props.reqSortSetting) == this.props.sortSetting.columnTitle ); };