From 0522b1e1839c98c7fb8a37376e16bde418d7fdc1 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Tue, 19 Jul 2022 09:05:08 -0400 Subject: [PATCH] sql: Plan and execution of CDC expressions. Prior to this change, CDC expression evaluation was managed by CDC library (`cdceval`) directly. This PR introduces a mechanism to plan and execute CDC expressions using optimizer and distSQL: `PlanCDCExpression` plans the execution of CDC expressions using optimizer, and `RunCDCEvaluation`, which is just a thin wrapper around regular distsql mechanisms (`PlanAndRun`), can be used to execute this plan. This library offers significant benefits to the CDC expressions: namely, CDC expressions will be more tightly integrated with optimizer and dist SQL execution framework. This in turn opens up additional venues for additional improvements over time. Release notes: None --- pkg/ccl/changefeedccl/cdceval/BUILD.bazel | 1 + pkg/sql/BUILD.bazel | 3 + pkg/sql/distsql_physical_planner.go | 2 + pkg/sql/distsql_plan_changefeed.go | 327 ++++++++++++++++ pkg/sql/distsql_plan_changefeed_test.go | 450 ++++++++++++++++++++++ pkg/sql/instrumentation.go | 4 +- pkg/sql/opt/exec/execbuilder/scalar.go | 3 +- pkg/sql/opt_catalog.go | 13 +- pkg/sql/plan.go | 2 +- pkg/sql/plan_columns.go | 2 + pkg/sql/plan_node_to_row_source.go | 12 +- pkg/sql/plan_opt.go | 29 +- pkg/sql/walk.go | 1 + 13 files changed, 826 insertions(+), 23 deletions(-) create mode 100644 pkg/sql/distsql_plan_changefeed.go create mode 100644 pkg/sql/distsql_plan_changefeed_test.go diff --git a/pkg/ccl/changefeedccl/cdceval/BUILD.bazel b/pkg/ccl/changefeedccl/cdceval/BUILD.bazel index 93e9e12a91a7..af9c2910fd9c 100644 --- a/pkg/ccl/changefeedccl/cdceval/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdceval/BUILD.bazel @@ -34,6 +34,7 @@ go_library( "//pkg/sql/sem/normalize", "//pkg/sql/sem/tree", "//pkg/sql/sem/volatility", + "//pkg/sql/sessiondata", "//pkg/sql/sessiondatapb", "//pkg/sql/types", "//pkg/util/hlc", diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index cea854c768c8..6dd490ed3009 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -79,6 +79,7 @@ go_library( "distsql_physical_planner.go", "distsql_plan_backfill.go", "distsql_plan_bulk.go", + "distsql_plan_changefeed.go", "distsql_plan_ctas.go", "distsql_plan_join.go", "distsql_plan_set_op.go", @@ -561,6 +562,7 @@ go_test( "distsql_physical_planner_test.go", "distsql_plan_backfill_test.go", "distsql_plan_bulk_test.go", + "distsql_plan_changefeed_test.go", "distsql_plan_set_op_test.go", "distsql_running_test.go", "drop_function_test.go", @@ -689,6 +691,7 @@ go_test( "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/catalogkeys", "//pkg/sql/catalog/catpb", + "//pkg/sql/catalog/colinfo", "//pkg/sql/catalog/descidgen", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", diff --git a/pkg/sql/distsql_physical_planner.go b/pkg/sql/distsql_physical_planner.go index 2bf1645723d6..852365b5bbfc 100644 --- a/pkg/sql/distsql_physical_planner.go +++ b/pkg/sql/distsql_physical_planner.go @@ -686,6 +686,8 @@ func checkSupportForPlanNode(node planNode) (distRecommendation, error) { return cannotDistribute, err } return shouldDistribute, nil + case *cdcValuesNode: + return cannotDistribute, nil default: return cannotDistribute, planNodeNotSupportedErr diff --git a/pkg/sql/distsql_plan_changefeed.go b/pkg/sql/distsql_plan_changefeed.go new file mode 100644 index 000000000000..8ecf6aeeec31 --- /dev/null +++ b/pkg/sql/distsql_plan_changefeed.go @@ -0,0 +1,327 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/resolver" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/clusterunique" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" + "github.com/cockroachdb/cockroach/pkg/sql/opt/optbuilder" + "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" +) + +// CDCExpression represents CDC specific expression to plan. +// This expression is just a limited select clause; +// see sql.y for details. The expression is normalized and validated by CDC +// (see cdc_eval library) to ensure the expression is safe to use in CDC +// context. +type CDCExpression = *tree.Select + +// CDCExpressionPlan encapsulates execution plan for evaluation of CDC expressions. +type CDCExpressionPlan struct { + Plan planMaybePhysical // Underlying plan... + PlanCtx *PlanningCtx // ... and plan context + Spans roachpb.Spans // Set of spans for rangefeed. + Presentation colinfo.ResultColumns // List of result columns. +} + +// PlanCDCExpression plans the execution of CDCExpression. +// +// CDC expressions may contain only a single table. Because of the limited +// nature of the CDCExpression, this code assumes (and verifies) that the +// produced plan has only one instance of *scanNode. +// +// localPlanner is assumed to be an instance of planner created specifically for +// planning and execution of CDC expressions. This planner ought to be +// configured appropriately to resolve correct descriptor versions. +func PlanCDCExpression( + ctx context.Context, localPlanner interface{}, cdcExpr CDCExpression, +) (cdcPlan CDCExpressionPlan, _ error) { + p := localPlanner.(*planner) + p.stmt = makeStatement(parser.Statement{ + AST: cdcExpr, + SQL: tree.AsString(cdcExpr), + }, clusterunique.ID{} /* queryID */) + + p.curPlan.init(&p.stmt, &p.instrumentation) + p.optPlanningCtx.init(p) + opc := &p.optPlanningCtx + opc.reset(ctx) + + cdcCat := &cdcOptCatalog{ + optCatalog: opc.catalog.(*optCatalog), + } + opc.catalog = cdcCat + + // We could use opc.buildExecMemo; alas, it has too much logic we don't + // need, and, it also allows stable fold -- something we don't want to do. + // So, just build memo ourselves. + f := opc.optimizer.Factory() + f.FoldingControl().DisallowStableFolds() + bld := optbuilder.New(ctx, &p.semaCtx, p.EvalContext(), opc.catalog, f, opc.p.stmt.AST) + if err := bld.Build(); err != nil { + return cdcPlan, err + } + + oe, err := opc.optimizer.Optimize() + if err != nil { + return cdcPlan, err + } + if log.V(2) { + log.Infof(ctx, "Optimized CDC expression: %s", oe.String()) + } + execMemo := f.Memo() + + const allowAutoCommit = false + if err := opc.runExecBuilder( + ctx, &p.curPlan, &p.stmt, newExecFactory(ctx, p), execMemo, p.EvalContext(), allowAutoCommit, + ); err != nil { + return cdcPlan, err + } + + // Walk the plan, perform sanity checks and extract information we need. + var spans roachpb.Spans + var presentation colinfo.ResultColumns + + if err := walkPlan(ctx, p.curPlan.main.planNode, planObserver{ + enterNode: func(ctx context.Context, nodeName string, plan planNode) (bool, error) { + switch n := plan.(type) { + case *scanNode: + // Collect spans we wanted to scan. The select statement used for this + // plan should result in a single table scan of primary index span. + if len(spans) > 0 { + return false, errors.AssertionFailedf("unexpected multiple primary index scan operations") + } + if n.index.GetID() != n.desc.GetPrimaryIndexID() { + return false, errors.AssertionFailedf( + "expect scan of primary index, found scan of %d", n.index.GetID()) + } + spans = n.spans + case *zeroNode: + return false, errors.Newf( + "changefeed expression %s does not match any rows", tree.AsString(cdcExpr)) + } + + // Because the walk is top down, the top node is the node containing the + // list of columns to return. + if len(presentation) == 0 { + presentation = planColumns(plan) + } + return true, nil + }, + }); err != nil { + return cdcPlan, err + } + + if len(spans) == 0 { + // Should have been handled by the zeroNode check above. + return cdcPlan, errors.AssertionFailedf("expected at least 1 span to scan") + } + + if len(presentation) == 0 { + return cdcPlan, errors.AssertionFailedf("unable to determine result columns") + } + + planCtx := p.DistSQLPlanner().NewPlanningCtx(ctx, &p.extendedEvalCtx, p, p.txn, DistributionTypeNone) + planCtx.isLocal = true + + return CDCExpressionPlan{ + Plan: p.curPlan.main, + PlanCtx: planCtx, + Spans: spans, + Presentation: presentation, + }, nil +} + +// RunCDCEvaluation runs plan previously prepared by PlanCDCExpression. +// Data is pushed into this flow from source, which generates data for the +// specified table columns. +// Results of evaluations are written to the receiver. +func RunCDCEvaluation( + ctx context.Context, + cdcPlan CDCExpressionPlan, + source execinfra.RowSource, + sourceCols catalog.TableColMap, + receiver *DistSQLReceiver, +) (err error) { + cdcPlan.Plan.planNode, err = prepareCDCPlan(ctx, cdcPlan.Plan.planNode, source, sourceCols) + if err != nil { + return err + } + + // Execute. + p := cdcPlan.PlanCtx.planner + p.DistSQLPlanner().PlanAndRun( + ctx, &p.extendedEvalCtx, cdcPlan.PlanCtx, p.txn, cdcPlan.Plan, receiver) + return nil +} + +func prepareCDCPlan( + ctx context.Context, plan planNode, source execinfra.RowSource, sourceCols catalog.TableColMap, +) (planNode, error) { + // Replace a single scan node (this was checked when constructing + // CDCExpressionPlan) with a cdcValuesNode that reads from the source, which + // includes specified column IDs. + replaced := false + v := makePlanVisitor(ctx, planObserver{ + replaceNode: func(ctx context.Context, nodeName string, plan planNode) (planNode, error) { + scan, ok := plan.(*scanNode) + if !ok { + return nil, nil + } + replaced = true + defer scan.Close(ctx) + return newCDCValuesNode(scan, source, sourceCols) + }, + }) + plan = v.visit(plan) + if v.err != nil { + return nil, v.err + } + if !replaced { + return nil, errors.AssertionFailedf("expected to find one scan node, found none") + } + return plan, nil +} + +// cdcValuesNode replaces regular scanNode with cdc specific implementation +// which returns values from the execinfra.RowSource. +type cdcValuesNode struct { + source execinfra.RowSource + datumRow []tree.Datum + colOrd []int + resultColumns []colinfo.ResultColumn + alloc tree.DatumAlloc +} + +var _ planNode = (*cdcValuesNode)(nil) + +func newCDCValuesNode( + scan *scanNode, source execinfra.RowSource, sourceCols catalog.TableColMap, +) (planNode, error) { + v := cdcValuesNode{ + source: source, + datumRow: make([]tree.Datum, len(scan.resultColumns)), + resultColumns: scan.resultColumns, + colOrd: make([]int, len(scan.cols)), + } + + for i, c := range scan.cols { + sourceOrd, ok := sourceCols.Get(c.GetID()) + if !ok { + return nil, errors.Newf("source does not contain column %s (id %d)", c.GetName(), c.GetID()) + } + v.colOrd[i] = sourceOrd + } + + return &v, nil +} + +// startExec implements planNode. +func (n *cdcValuesNode) startExec(params runParams) error { + n.source.Start(params.ctx) + return nil +} + +// Next implements planNode. +func (n *cdcValuesNode) Next(params runParams) (bool, error) { + row, meta := n.source.Next() + if meta != nil { + return false, errors.AssertionFailedf("unexpected producer meta returned") + } + if row == nil { + return false, nil + } + + typs := n.source.OutputTypes() + for i, ord := range n.colOrd { + if err := row[ord].EnsureDecoded(typs[ord], &n.alloc); err != nil { + return false, err + } + n.datumRow[i] = row[ord].Datum + } + return true, nil +} + +// Values implements planNode. +func (n *cdcValuesNode) Values() tree.Datums { + return n.datumRow +} + +// Close implements planNode. +func (n *cdcValuesNode) Close(ctx context.Context) { +} + +type cdcOptCatalog struct { + *optCatalog +} + +var _ cat.Catalog = (*cdcOptCatalog)(nil) + +// ResolveDataSource implements cat.Catalog interface. +// We provide custom implementation to ensure that we return data source for +// primary index. +func (c *cdcOptCatalog) ResolveDataSource( + ctx context.Context, flags cat.Flags, name *cat.DataSourceName, +) (cat.DataSource, cat.DataSourceName, error) { + lflags := tree.ObjectLookupFlagsWithRequiredTableKind(tree.ResolveRequireTableDesc) + _, desc, err := resolver.ResolveExistingTableObject(ctx, c.planner, name, lflags) + if err != nil { + return nil, cat.DataSourceName{}, err + } + + ds, err := c.newCDCDataSource(desc) + if err != nil { + return nil, *name, err + } + return ds, *name, nil +} + +// ResolveDataSourceByID implements cat.Catalog interface. +// We provide custom implementation to ensure that we return data source for +// primary index span. +func (c *cdcOptCatalog) ResolveDataSourceByID( + ctx context.Context, flags cat.Flags, id cat.StableID, +) (cat.DataSource, bool, error) { + desc, err := c.planner.LookupTableByID(ctx, descpb.ID(id)) + if err != nil { + return nil, false, err + } + ds, err := c.newCDCDataSource(desc) + if err != nil { + return nil, false, err + } + return ds, false, nil +} + +// newCDCDataSource builds an optTable for the target cdc table. +// The descriptor presented to the optimizer hides all but the primary index. +// TODO(yevgeniy): We should be able to use secondary indexes provided +// the CDC expression access only the columns available in that secondary index. +func (c *cdcOptCatalog) newCDCDataSource(original catalog.TableDescriptor) (cat.DataSource, error) { + // Build descriptor with all indexes other than primary removed. + desc := protoutil.Clone(original.TableDesc()).(*descpb.TableDescriptor) + desc.Indexes = desc.Indexes[:0] + updated := tabledesc.NewBuilder(desc).BuildImmutableTable() + return newOptTable(updated, c.codec(), nil, emptyZoneConfig) +} diff --git a/pkg/sql/distsql_plan_changefeed_test.go b/pkg/sql/distsql_plan_changefeed_test.go new file mode 100644 index 000000000000..1de5b52a3d0d --- /dev/null +++ b/pkg/sql/distsql_plan_changefeed_test.go @@ -0,0 +1,450 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package sql + +import ( + "context" + "math/rand" + "testing" + + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/randgen" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" + "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" + "github.com/cockroachdb/cockroach/pkg/sql/tests" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/stretchr/testify/require" +) + +// Tests in this file test CDC specific logic when planning/executing +// expressions. CDC expressions are a restricted form of a select statement. The +// tests here do not test full CDC semantics, and instead just use expressions +// that satisfy CDC requirement. The tests here also do not test any CDC +// specific functions; this is done elsewhere. + +func TestChangefeedLogicalPlan(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + params, _ := tests.CreateTestServerParams() + s, db, kvDB := serverutils.StartServer(t, params) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TABLE foo (a INT, b int, c STRING, CONSTRAINT "pk" PRIMARY KEY (a, b), UNIQUE (c))`) + fooDesc := desctestutils.TestingGetTableDescriptor( + kvDB, keys.SystemSQLCodec, "defaultdb", "public", "foo") + + ctx := context.Background() + execCfg := s.ExecutorConfig().(ExecutorConfig) + + p, cleanup := NewInternalPlanner("test", kv.NewTxn(ctx, kvDB, s.NodeID()), + username.RootUserName(), &MemoryMetrics{}, &execCfg, sessiondatapb.SessionData{ + Database: "defaultdb", + SearchPath: sessiondata.DefaultSearchPath.GetPathArray(), + }, + ) + defer cleanup() + + primarySpan := fooDesc.PrimaryIndexSpan(keys.SystemSQLCodec) + pkStart := primarySpan.Key + pkEnd := primarySpan.EndKey + fooID := fooDesc.GetID() + + rc := func(n string, typ *types.T) colinfo.ResultColumn { + return colinfo.ResultColumn{Name: n, Typ: typ} + } + allColumns := colinfo.ResultColumns{ + rc("a", types.Int), rc("b", types.Int), rc("c", types.String), + } + checkPresentation := func(t *testing.T, expected, found colinfo.ResultColumns) { + t.Helper() + require.Equal(t, len(expected), len(found)) + for i := 0; i < len(found); i++ { + require.Equal(t, expected[i].Name, found[i].Name, "e=%v f=%v", expected[i], found[i]) + require.True(t, expected[i].Typ.Equal(found[i].Typ), "e=%v f=%v", expected[i], found[i]) + } + } + + for _, tc := range []struct { + stmt string + expectErr string + expectSpans roachpb.Spans + present colinfo.ResultColumns + }{ + { + stmt: "SELECT * FROM foo WHERE 5 > 1", + expectSpans: roachpb.Spans{primarySpan}, + present: allColumns, + }, + { + stmt: "SELECT * FROM foo WHERE 0 != 0", + expectErr: "does not match any rows", + }, + { + stmt: "SELECT * FROM foo WHERE a IS NULL", + expectErr: "does not match any rows", + }, + { + stmt: "SELECT * FROM foo WHERE a > 3 AND a < 3", + expectErr: "does not match any rows", + }, + { + stmt: "SELECT * FROM foo WHERE 5 > 1 UNION SELECT * FROM foo WHERE a < 1", + expectErr: "unexpected multiple primary index scan operations", + }, + { + stmt: "SELECT * FROM foo WHERE a >=3 or a < 3", + expectSpans: roachpb.Spans{primarySpan}, + present: allColumns, + }, + { + stmt: "SELECT * FROM foo WHERE 5", + expectErr: "argument of WHERE must be type bool, not type int", + }, + { + stmt: "SELECT * FROM foo WHERE no_such_column = 'something'", + expectErr: `column "no_such_column" does not exist`, + }, + { + stmt: "SELECT * FROM foo WHERE true", + expectSpans: roachpb.Spans{primarySpan}, + present: allColumns, + }, + { + stmt: "SELECT * FROM foo WHERE false", + expectErr: "does not match any rows", + }, + { + stmt: "SELECT * FROM foo WHERE a > 100", + expectSpans: roachpb.Spans{{Key: mkPkKey(t, fooID, 101), EndKey: pkEnd}}, + present: allColumns, + }, + { + stmt: "SELECT * FROM foo WHERE a < 100", + expectSpans: roachpb.Spans{{Key: pkStart, EndKey: mkPkKey(t, fooID, 100)}}, + present: allColumns, + }, + { + stmt: "SELECT * FROM foo WHERE a > 10 AND a > 5", + expectSpans: roachpb.Spans{{Key: mkPkKey(t, fooID, 11), EndKey: pkEnd}}, + present: allColumns, + }, + { + stmt: "SELECT * FROM foo WHERE a > 10 OR a > 5", + expectSpans: roachpb.Spans{{Key: mkPkKey(t, fooID, 6), EndKey: pkEnd}}, + present: allColumns, + }, + { + stmt: "SELECT * FROM foo WHERE a > 100 AND a <= 101", + expectSpans: roachpb.Spans{{Key: mkPkKey(t, fooID, 101), EndKey: mkPkKey(t, fooID, 102)}}, + present: allColumns, + }, + { + stmt: "SELECT * FROM foo WHERE a > 100 and a < 200", + expectSpans: roachpb.Spans{{Key: mkPkKey(t, fooID, 101), EndKey: mkPkKey(t, fooID, 200)}}, + present: allColumns, + }, + { + stmt: "SELECT * FROM foo WHERE a > 100 or a <= 99", + expectSpans: roachpb.Spans{ + {Key: pkStart, EndKey: mkPkKey(t, fooID, 100)}, + {Key: mkPkKey(t, fooID, 101), EndKey: pkEnd}, + }, + present: allColumns, + }, + { + stmt: "SELECT * FROM foo WHERE a > 100 AND b > 11", + expectSpans: roachpb.Spans{{Key: mkPkKey(t, fooID, 101, 12), EndKey: pkEnd}}, + present: allColumns, + }, + { + // Same as above, but with table alias -- we expect remaining expression to + // preserve the alias. + stmt: "SELECT * FROM foo AS buz WHERE buz.a > 100 AND b > 11", + expectSpans: roachpb.Spans{{Key: mkPkKey(t, fooID, 101, 12), EndKey: pkEnd}}, + present: allColumns, + }, + { + // Same as above, but w/ silly tautology, which should be removed. + stmt: "SELECT * FROM defaultdb.public.foo WHERE (a > 3 OR a <= 3) AND a > 100 AND b > 11", + expectSpans: roachpb.Spans{{Key: mkPkKey(t, fooID, 101, 12), EndKey: pkEnd}}, + present: allColumns, + }, + { + stmt: "SELECT * FROM foo WHERE a < 42 OR (a > 100 AND b > 11)", + expectSpans: roachpb.Spans{ + {Key: pkStart, EndKey: mkPkKey(t, fooID, 42)}, + {Key: mkPkKey(t, fooID, 101, 12), EndKey: pkEnd}, + }, + present: allColumns, + }, + { + // Same as above, but now with tuples. + stmt: "SELECT * FROM foo WHERE a < 42 OR ((a, b) > (100, 11))", + expectSpans: roachpb.Spans{ + {Key: pkStart, EndKey: mkPkKey(t, fooID, 42)}, + // Remember: tuples use lexicographical ordering so the start key is + // /Table/104/1/100/12 (i.e. a="100" and b="12" (because 100/12 lexicographically follows 100). + {Key: mkPkKey(t, fooID, 100, 12), EndKey: pkEnd}, + }, + present: allColumns, + }, + { + stmt: "SELECT * FROM foo WHERE (a, b) > (2, 5)", + expectSpans: roachpb.Spans{{Key: mkPkKey(t, fooID, 2, 6), EndKey: pkEnd}}, + present: allColumns, + }, + { + // Test that aliased table names work. + stmt: "SELECT * FROM foo as buz WHERE (buz.a, buz.b) > (2, 5)", + expectSpans: roachpb.Spans{{Key: mkPkKey(t, fooID, 2, 6), EndKey: pkEnd}}, + present: allColumns, + }, + { + // This test also uses qualified names for some fields. + stmt: "SELECT * FROM foo WHERE foo.a IN (5, 10, 20) AND b < 25", + expectSpans: roachpb.Spans{ + {Key: mkPkKey(t, fooID, 5), EndKey: mkPkKey(t, fooID, 5, 25)}, + {Key: mkPkKey(t, fooID, 10), EndKey: mkPkKey(t, fooID, 10, 25)}, + {Key: mkPkKey(t, fooID, 20), EndKey: mkPkKey(t, fooID, 20, 25)}, + }, + present: allColumns, + }, + { + // Currently, only primary index supported; so even when doing lookup + // on a non-primary index, we expect primary index to be scanned. + stmt: "SELECT * FROM foo WHERE c = 'unique'", + expectSpans: roachpb.Spans{primarySpan}, + present: allColumns, + }, + { + // Point lookup. + stmt: `SELECT a as apple, b as boy, pi()/2 as "halfPie" FROM foo WHERE (a = 5 AND b = 10)`, + expectSpans: roachpb.Spans{{Key: mkPkKey(t, fooID, 5, 10, 0)}}, + present: colinfo.ResultColumns{ + rc("apple", types.Int), rc("boy", types.Int), rc("halfPie", types.Float), + }, + }, + { + // Scope -- restrict columns + stmt: `SELECT * FROM (SELECT a, c FROM foo) AS foo`, + expectSpans: roachpb.Spans{primarySpan}, + present: colinfo.ResultColumns{rc("a", types.Int), rc("c", types.String)}, + }, + } { + t.Run(tc.stmt, func(t *testing.T) { + stmt, err := parser.ParseOne(tc.stmt) + require.NoError(t, err) + expr := stmt.AST.(*tree.Select) + + plan, err := PlanCDCExpression(ctx, p, expr) + if tc.expectErr != "" { + require.Regexp(t, tc.expectErr, err, err) + return + } + + require.NoError(t, err) + require.Equal(t, tc.expectSpans, plan.Spans) + checkPresentation(t, tc.present, plan.Presentation) + }) + } +} + +// Ensure the physical plan does not buffer results. +func TestChangefeedStreamsResults(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + params, _ := tests.CreateTestServerParams() + s, db, kvDB := serverutils.StartServer(t, params) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b int)`) + fooDesc := desctestutils.TestingGetTableDescriptor( + kvDB, keys.SystemSQLCodec, "defaultdb", "public", "foo") + + ctx := context.Background() + execCfg := s.ExecutorConfig().(ExecutorConfig) + p, cleanup := NewInternalPlanner("test", kv.NewTxn(ctx, kvDB, s.NodeID()), + username.RootUserName(), &MemoryMetrics{}, &execCfg, sessiondatapb.SessionData{ + Database: "defaultdb", + SearchPath: sessiondata.DefaultSearchPath.GetPathArray(), + }, + ) + defer cleanup() + stmt, err := parser.ParseOne("SELECT * FROM foo WHERE a < 10") + require.NoError(t, err) + expr := stmt.AST.(*tree.Select) + cdcPlan, err := PlanCDCExpression(ctx, p, expr) + require.NoError(t, err) + + cdcPlan.Plan.planNode, err = prepareCDCPlan(ctx, cdcPlan.Plan.planNode, + nil, catalog.ColumnIDToOrdinalMap(fooDesc.PublicColumns())) + require.NoError(t, err) + + planner := p.(*planner) + physPlan, physPlanCleanup, err := planner.DistSQLPlanner().createPhysPlan(ctx, cdcPlan.PlanCtx, cdcPlan.Plan) + defer physPlanCleanup() + require.NoError(t, err) + require.Equal(t, 1, len(physPlan.LocalProcessors)) + require.True(t, physPlan.LocalProcessors[0].MustBeStreaming()) +} + +func TestCdcExpressionExecution(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + params, _ := tests.CreateTestServerParams() + s, db, kvDB := serverutils.StartServer(t, params) + defer s.Stopper().Stop(context.Background()) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY, b INT, c STRING)`) + fooDesc := desctestutils.TestingGetTableDescriptor( + kvDB, keys.SystemSQLCodec, "defaultdb", "public", "foo") + + ctx := context.Background() + execCfg := s.ExecutorConfig().(ExecutorConfig) + p, cleanup := NewInternalPlanner("test", kv.NewTxn(ctx, kvDB, s.NodeID()), + username.RootUserName(), &MemoryMetrics{}, &execCfg, sessiondatapb.SessionData{ + Database: "defaultdb", + SearchPath: sessiondata.DefaultSearchPath.GetPathArray(), + }, + ) + defer cleanup() + planner := p.(*planner) + + for _, tc := range []struct { + name string + stmt string + expectRow func(row rowenc.EncDatumRow) (expected []string) + }{ + { + name: "star", + stmt: "SELECT * FROM foo WHERE a % 2 = 0", + expectRow: func(row rowenc.EncDatumRow) (expected []string) { + if tree.MustBeDInt(row[0].Datum)%2 == 0 { + a := tree.AsStringWithFlags(row[0].Datum, tree.FmtExport) + b := tree.AsStringWithFlags(row[1].Datum, tree.FmtExport) + c := tree.AsStringWithFlags(row[2].Datum, tree.FmtExport) + expected = append(expected, a, b, c) + } + return expected + }, + }, + { + name: "same columns", + stmt: "SELECT a, c, c, a FROM foo WHERE a % 2 = 0", + expectRow: func(row rowenc.EncDatumRow) (expected []string) { + if tree.MustBeDInt(row[0].Datum)%2 == 0 { + a := tree.AsStringWithFlags(row[0].Datum, tree.FmtExport) + c := tree.AsStringWithFlags(row[2].Datum, tree.FmtExport) + expected = append(expected, a, c, c, a) + } + return expected + }, + }, + { + name: "double c", + stmt: "SELECT concat(c, c) AS doubleC FROM foo WHERE c IS NOT NULL", + expectRow: func(row rowenc.EncDatumRow) (expected []string) { + if row[2].Datum != tree.DNull { + c := tree.AsStringWithFlags(row[2].Datum, tree.FmtExport) + expected = append(expected, c+c) + } + return expected + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + stmt, err := parser.ParseOne(tc.stmt) + require.NoError(t, err) + expr := stmt.AST.(*tree.Select) + var input execinfra.RowChannel + input.InitWithNumSenders([]*types.T{types.Int, types.Int, types.String}, 1) + plan, err := PlanCDCExpression(ctx, p, expr) + require.NoError(t, err) + + var rows [][]string + g := ctxgroup.WithContext(context.Background()) + + g.GoCtx(func(ctx context.Context) error { + writer := NewCallbackResultWriter(func(ctx context.Context, row tree.Datums) error { + strRow := make([]string, len(row)) + for i, d := range row { + strRow[i] = tree.AsStringWithFlags(d, tree.FmtExport) + } + rows = append(rows, strRow) + return nil + }) + + r := MakeDistSQLReceiver( + ctx, + writer, + tree.Rows, + planner.execCfg.RangeDescriptorCache, + nil, + nil, /* clockUpdater */ + planner.extendedEvalCtx.Tracing, + planner.execCfg.ContentionRegistry, + ) + defer r.Release() + + if err := RunCDCEvaluation(ctx, plan, &input, + catalog.ColumnIDToOrdinalMap(fooDesc.PublicColumns()), r); err != nil { + return err + } + return writer.Err() + }) + + rng, _ := randutil.NewTestRand() + + var expectedRows [][]string + for i := 0; i < 100; i++ { + row := randEncDatumRow(rng, fooDesc) + input.Push(row, nil) + if expected := tc.expectRow(row); expected != nil { + expectedRows = append(expectedRows, tc.expectRow(row)) + } + } + input.ProducerDone() + require.NoError(t, g.Wait()) + require.Equal(t, expectedRows, rows) + }) + } +} + +func randEncDatumRow(rng *rand.Rand, desc catalog.TableDescriptor) (row rowenc.EncDatumRow) { + for _, col := range desc.PublicColumns() { + if !col.IsVirtual() { + row = append(row, rowenc.EncDatum{Datum: randgen.RandDatum(rng, col.GetType(), col.IsNullable())}) + } + } + return row +} diff --git a/pkg/sql/instrumentation.go b/pkg/sql/instrumentation.go index b098bdc6c828..826413836936 100644 --- a/pkg/sql/instrumentation.go +++ b/pkg/sql/instrumentation.go @@ -736,9 +736,9 @@ func (ih *instrumentationHelper) SetIndexRecommendations( ) { f := opc.optimizer.Factory() evalCtx := opc.p.EvalContext() - f.Init(ctx, evalCtx, &opc.catalog) + f.Init(ctx, evalCtx, opc.catalog) f.FoldingControl().AllowStableFolds() - bld := optbuilder.New(ctx, &opc.p.semaCtx, evalCtx, &opc.catalog, f, opc.p.stmt.AST) + bld := optbuilder.New(ctx, &opc.p.semaCtx, evalCtx, opc.catalog, f, opc.p.stmt.AST) err := bld.Build() if err != nil { log.Warningf(ctx, "unable to build memo: %s", err) diff --git a/pkg/sql/opt/exec/execbuilder/scalar.go b/pkg/sql/opt/exec/execbuilder/scalar.go index e017e5e19780..4cec171709f4 100644 --- a/pkg/sql/opt/exec/execbuilder/scalar.go +++ b/pkg/sql/opt/exec/execbuilder/scalar.go @@ -723,7 +723,8 @@ func (b *Builder) buildUDF(ctx *buildScalarCtx, scalar opt.ScalarExpr) (tree.Typ // TODO(mgartner): Add support for WITH expressions inside UDF bodies. // TODO(mgartner): Add support for subqueries inside UDF bodies. ef := ref.(exec.Factory) - eb := New(ctx, ef, &o, f.Memo(), b.catalog, newRightSide, b.evalCtx, false /* allowAutoCommit */, b.IsANSIDML) + eb := New(ctx, ef, &o, f.Memo(), b.catalog, newRightSide, + b.evalCtx, b.semaCtx, false /* allowAutoCommit */, b.IsANSIDML) eb.disableTelemetry = true plan, err := eb.Build() if err != nil { diff --git a/pkg/sql/opt_catalog.go b/pkg/sql/opt_catalog.go index a03cb86b9d59..d047548101d5 100644 --- a/pkg/sql/opt_catalog.go +++ b/pkg/sql/opt_catalog.go @@ -69,7 +69,18 @@ type optCatalog struct { tn tree.TableName } -var _ cat.Catalog = &optCatalog{} +var _ cat.Catalog = (*optCatalog)(nil) + +// optPlanningCatalog is a thin wrapper over cat.Catalog +// with few additional planner specific methods. +type optPlanningCatalog interface { + cat.Catalog + init(planner *planner) + reset() + fullyQualifiedNameWithTxn( + ctx context.Context, ds cat.DataSource, txn *kv.Txn, + ) (cat.DataSourceName, error) +} // init initializes an optCatalog instance (which the caller can pre-allocate). // The instance can be used across multiple queries, but reset() should be diff --git a/pkg/sql/plan.go b/pkg/sql/plan.go index 5b3c5e97d1fb..bb9ad9de0bff 100644 --- a/pkg/sql/plan.go +++ b/pkg/sql/plan.go @@ -306,7 +306,7 @@ type planTop struct { // mem/catalog retains the memo and catalog that were used to create the // plan. Only set if needed by instrumentation (see ShouldSaveMemo). mem *memo.Memo - catalog *optCatalog + catalog optPlanningCatalog // auditEvents becomes non-nil if any of the descriptors used by // current statement is causing an auditing event. See exec_log.go. diff --git a/pkg/sql/plan_columns.go b/pkg/sql/plan_columns.go index 195e42036d36..3f8e0cca0d4f 100644 --- a/pkg/sql/plan_columns.go +++ b/pkg/sql/plan_columns.go @@ -167,6 +167,8 @@ func getPlanColumns(plan planNode, mut bool) colinfo.ResultColumns { } case *rowSourceToPlanNode: return n.planCols + case *cdcValuesNode: + return n.resultColumns } // Every other node has no columns in their results. diff --git a/pkg/sql/plan_node_to_row_source.go b/pkg/sql/plan_node_to_row_source.go index 047e1199230e..93f7bddae30e 100644 --- a/pkg/sql/plan_node_to_row_source.go +++ b/pkg/sql/plan_node_to_row_source.go @@ -89,10 +89,14 @@ func newPlanNodeToRowSource( // MustBeStreaming implements the execinfra.Processor interface. func (p *planNodeToRowSource) MustBeStreaming() bool { - // hookFnNode is special because it might be blocked forever if we decide to - // buffer its output. - _, isHookFnNode := p.node.(*hookFnNode) - return isHookFnNode + switch p.node.(type) { + case *hookFnNode, *cdcValuesNode: + // hookFnNode is special because it might be blocked forever if we decide to + // buffer its output. + return true + default: + return false + } } // InitWithOutput implements the LocalProcessor interface. diff --git a/pkg/sql/plan_opt.go b/pkg/sql/plan_opt.go index 3248e72b56aa..8af3ca0a1579 100644 --- a/pkg/sql/plan_opt.go +++ b/pkg/sql/plan_opt.go @@ -116,7 +116,7 @@ func (p *planner) prepareUsingOptimizer(ctx context.Context) (planFlags, error) // would be a better way to accomplish this goal. See CREATE TABLE for an // example. f := opc.optimizer.Factory() - bld := optbuilder.New(ctx, &p.semaCtx, p.EvalContext(), &opc.catalog, f, t.Select) + bld := optbuilder.New(ctx, &p.semaCtx, p.EvalContext(), opc.catalog, f, t.Select) if err := bld.Build(); err != nil { return opc.flags, err } @@ -130,7 +130,7 @@ func (p *planner) prepareUsingOptimizer(ctx context.Context) (planFlags, error) if !pm.TypeHints.Identical(p.semaCtx.Placeholders.TypeHints) { opc.log(ctx, "query cache hit but type hints don't match") } else { - isStale, err := cachedData.Memo.IsStale(ctx, p.EvalContext(), &opc.catalog) + isStale, err := cachedData.Memo.IsStale(ctx, p.EvalContext(), opc.catalog) if err != nil { return 0, err } @@ -315,7 +315,7 @@ type optPlanningCtx struct { // catalog is initialized once, and reset for each query. This allows the // catalog objects to be reused across queries in the same session. - catalog optCatalog + catalog optPlanningCatalog // -- Fields below are reinitialized for each query --- @@ -335,6 +335,7 @@ type optPlanningCtx struct { // also be called before each use. func (opc *optPlanningCtx) init(p *planner) { opc.p = p + opc.catalog = &optCatalog{} opc.catalog.init(p) } @@ -342,7 +343,7 @@ func (opc *optPlanningCtx) init(p *planner) { func (opc *optPlanningCtx) reset(ctx context.Context) { p := opc.p opc.catalog.reset() - opc.optimizer.Init(ctx, p.EvalContext(), &opc.catalog) + opc.optimizer.Init(ctx, p.EvalContext(), opc.catalog) opc.flags = 0 // We only allow memo caching for SELECT/INSERT/UPDATE/DELETE. We could @@ -416,7 +417,7 @@ func (opc *optPlanningCtx) buildReusableMemo(ctx context.Context) (_ *memo.Memo, // that there's even less to do during the EXECUTE phase. // f := opc.optimizer.Factory() - bld := optbuilder.New(ctx, &p.semaCtx, p.EvalContext(), &opc.catalog, f, opc.p.stmt.AST) + bld := optbuilder.New(ctx, &p.semaCtx, p.EvalContext(), opc.catalog, f, opc.p.stmt.AST) bld.KeepPlaceholders = true if err := bld.Build(); err != nil { return nil, err @@ -512,7 +513,7 @@ func (opc *optPlanningCtx) buildExecMemo(ctx context.Context) (_ *memo.Memo, _ e // If the prepared memo has been invalidated by schema or other changes, // re-prepare it. - if isStale, err := prepared.Memo.IsStale(ctx, p.EvalContext(), &opc.catalog); err != nil { + if isStale, err := prepared.Memo.IsStale(ctx, p.EvalContext(), opc.catalog); err != nil { return nil, err } else if isStale { opc.log(ctx, "rebuilding cached memo") @@ -530,7 +531,7 @@ func (opc *optPlanningCtx) buildExecMemo(ctx context.Context) (_ *memo.Memo, _ e // Consult the query cache. cachedData, ok := p.execCfg.QueryCache.Find(&p.queryCacheSession, opc.p.stmt.SQL) if ok { - if isStale, err := cachedData.Memo.IsStale(ctx, p.EvalContext(), &opc.catalog); err != nil { + if isStale, err := cachedData.Memo.IsStale(ctx, p.EvalContext(), opc.catalog); err != nil { return nil, err } else if isStale { opc.log(ctx, "query cache hit but needed update") @@ -560,7 +561,7 @@ func (opc *optPlanningCtx) buildExecMemo(ctx context.Context) (_ *memo.Memo, _ e // available. f := opc.optimizer.Factory() f.FoldingControl().AllowStableFolds() - bld := optbuilder.New(ctx, &p.semaCtx, p.EvalContext(), &opc.catalog, f, opc.p.stmt.AST) + bld := optbuilder.New(ctx, &p.semaCtx, p.EvalContext(), opc.catalog, f, opc.p.stmt.AST) if err := bld.Build(); err != nil { return nil, err } @@ -619,7 +620,7 @@ func (opc *optPlanningCtx) runExecBuilder( } var bld *execbuilder.Builder if !planTop.instrumentation.ShouldBuildExplainPlan() { - bld = execbuilder.New(ctx, f, &opc.optimizer, mem, &opc.catalog, mem.RootExpr(), + bld = execbuilder.New(ctx, f, &opc.optimizer, mem, opc.catalog, mem.RootExpr(), evalCtx, &opc.p.semaCtx, allowAutoCommit, stmt.IsANSIDML()) plan, err := bld.Build() if err != nil { @@ -629,7 +630,7 @@ func (opc *optPlanningCtx) runExecBuilder( } else { // Create an explain factory and record the explain.Plan. explainFactory := explain.NewFactory(f) - bld = execbuilder.New(ctx, explainFactory, &opc.optimizer, mem, &opc.catalog, mem.RootExpr(), + bld = execbuilder.New(ctx, explainFactory, &opc.optimizer, mem, opc.catalog, mem.RootExpr(), evalCtx, &opc.p.semaCtx, allowAutoCommit, stmt.IsANSIDML()) plan, err := bld.Build() if err != nil { @@ -688,7 +689,7 @@ func (opc *optPlanningCtx) runExecBuilder( } if planTop.instrumentation.ShouldSaveMemo() { planTop.mem = mem - planTop.catalog = &opc.catalog + planTop.catalog = opc.catalog } return nil } @@ -698,7 +699,7 @@ func (opc *optPlanningCtx) runExecBuilder( func (p *planner) DecodeGist(gist string, external bool) ([]string, error) { var cat cat.Catalog if !external { - cat = &p.optPlanningCtx.catalog + cat = p.optPlanningCtx.catalog } return explain.DecodePlanGistToRows(gist, cat) } @@ -754,7 +755,7 @@ func (opc *optPlanningCtx) makeQueryIndexRecommendation(ctx context.Context) (er // Optimize with the saved memo and hypothetical tables. Walk through the // optimal plan to determine index recommendations. - opc.optimizer.Init(ctx, f.EvalContext(), &opc.catalog) + opc.optimizer.Init(ctx, f.EvalContext(), opc.catalog) f.CopyAndReplace( savedMemo.RootExpr().(memo.RelExpr), savedMemo.RootProps(), @@ -770,7 +771,7 @@ func (opc *optPlanningCtx) makeQueryIndexRecommendation(ctx context.Context) (er // Re-initialize the optimizer (which also re-initializes the factory) and // update the saved memo's metadata with the original table information. // Prepare to re-optimize and create an executable plan. - opc.optimizer.Init(ctx, f.EvalContext(), &opc.catalog) + opc.optimizer.Init(ctx, f.EvalContext(), opc.catalog) savedMemo.Metadata().UpdateTableMeta(optTables) f.CopyAndReplace( savedMemo.RootExpr().(memo.RelExpr), diff --git a/pkg/sql/walk.go b/pkg/sql/walk.go index fcebc6b39fed..ae66d2dafd11 100644 --- a/pkg/sql/walk.go +++ b/pkg/sql/walk.go @@ -476,4 +476,5 @@ var planNodeNames = map[reflect.Type]string{ reflect.TypeOf(&zeroNode{}): "norows", reflect.TypeOf(&zigzagJoinNode{}): "zigzag join", reflect.TypeOf(&schemaChangePlanNode{}): "schema change", + reflect.TypeOf(&cdcValuesNode{}): "wrapped streaming node", }