Skip to content

Commit

Permalink
changefeedccl: Rely on optimizer and distSQL for expressions evaluation
Browse files Browse the repository at this point in the history
Previous PRs (#82562) introduced CDC expressions.

This PR builds on that and replaces majority of hand written
evaluation logic in favor of tighter integration with
optimizer and dist SQL processors.

CDC expression, which is really a simplified `SELECT` statement,
is now  planned by the optimizer `sql.PlanCDCExpression`.
The resulting plan is then fed to the distSQL, to produce
a specialized CDC execution plan (`sql.RunCDCEvaluation`).

The execution plan is special in that it is guaranteed to be
a local execution plan, and changefeed is expected to "feed"
the data (encoded row) directly into the execution pipeline,
with change aggregators consuming resulting projection.

The benefit of this approach is that expression optimization,
and evaluation is now handled by optimizer and distSQL.
The responsibility of CDC evaluation package is to make sure
that CDC expression is "sane" and to setup CDC specific functions.

Since the whole expression is not yet fully planned
by distSQL (i.e. we don't have changefeed operator implemented yet),
the integration between CDC expressions and optimizer/execInfra
is not yet complete.  In particular, this
PR does not replace current distSQL execution for CDC --
namely, we still keep the same execution model using hand planned
`ChangeFrontierProcessor` and `ChangeAggretagorProcessor`.
It augments existing model, while tightening the integration.

Still, this is an improvement over previous state.  The follow on work
will complete integration.

Some changes enabled by this implementation include the replacement
of `cdc_prev()` function which returned JSONb representation of the
previous row, with a `cdc_prev` tuple.  This makes changefeed
expressions more natural since tuples are strongly typed:

```
SELECT * FROM tbl WHERE col != cdc_prev.col`
```

In addition, by using tuple to represent the state of the previous row,
we can now leverage existing SQL functions.  For example, to emit
previus row as JSONb we can do:

```
SELECT *, row_to_json(cdc_prev) AS prevJson FROM tbl
```

Fixes #90416
Fixes #90714
Fixes #90455
Informs #90442
Informs CRDB-18978
Informs CRDB-17161

Release note (enterprise change): CDC expressions are now planned and
evaluated using SQL optimizer and distSQL execution. The state
of the previous row is now exposed as `cdc_prev` tuple.

Release note (backward-incompatible change): The replacement of
cdc_prev() function in favor a cdc_prev tuple is an incompatible
change that may break changefeeds that use old cdc_prev() function.
  • Loading branch information
Yevgeniy Miretskiy committed Dec 2, 2022
1 parent 34b99c1 commit b5cb990
Show file tree
Hide file tree
Showing 39 changed files with 2,133 additions and 2,406 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,6 @@ go_test(
"//pkg/sql/sem/tree",
"//pkg/sql/sem/volatility",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/sqllivenesstestutils",
"//pkg/sql/tests",
Expand Down
9 changes: 5 additions & 4 deletions pkg/ccl/changefeedccl/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
Expand Down Expand Up @@ -247,10 +248,10 @@ func createBenchmarkChangefeed(
if err != nil {
return nil, nil, err
}
serverCfg := s.DistSQLServer().(*distsql.ServerImpl).ServerConfig
eventConsumer, err := newKVEventToRowConsumer(ctx, &serverCfg, nil, sf, initialHighWater,
execCfg := s.ExecutorConfig().(sql.ExecutorConfig)
eventConsumer, err := newKVEventToRowConsumer(ctx, &execCfg, sf, initialHighWater,
sink, encoder, makeChangefeedConfigFromJobDetails(details),
execinfrapb.Expression{}, TestingKnobs{}, nil, nil)
execinfrapb.Expression{}, username.PublicRoleName(), TestingKnobs{}, nil, nil)

if err != nil {
return nil, nil, err
Expand Down
22 changes: 12 additions & 10 deletions pkg/ccl/changefeedccl/cdceval/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "cdceval",
srcs = [
"constraint.go",
"doc.go",
"expr_eval.go",
"func_resolver.go",
"functions.go",
"parse.go",
"plan.go",
"validation.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval",
Expand All @@ -19,27 +19,26 @@ go_library(
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/clusterversion",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/kv",
"//pkg/roachpb",
"//pkg/security/username",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/schemaexpr",
"//pkg/sql/catalog/descs",
"//pkg/sql/execinfra",
"//pkg/sql/parser",
"//pkg/sql/pgwire/pgcode",
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/catconstants",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/normalize",
"//pkg/sql/sem/tree",
"//pkg/sql/sem/volatility",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/sql/types",
"//pkg/util/ctxgroup",
"//pkg/util/hlc",
"//pkg/util/json",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
"@com_github_lib_pq//oid",
],
Expand All @@ -52,6 +51,7 @@ go_test(
"func_resolver_test.go",
"functions_test.go",
"main_test.go",
"plan_test.go",
"validation_test.go",
],
args = ["-test.timeout=295s"],
Expand All @@ -62,26 +62,28 @@ go_test(
"//pkg/ccl/changefeedccl/cdctest",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/jobs/jobspb",
"//pkg/keys",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/security/username",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/distsql",
"//pkg/sql/parser",
"//pkg/sql/randgen",
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/keyside",
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sessiondatapb",
"//pkg/sql/types",
"//pkg/testutils/serverutils",
"//pkg/testutils/sqlutils",
"//pkg/testutils/testcluster",
"//pkg/util/encoding",
"//pkg/util/hlc",
"//pkg/util/json",
"//pkg/util/leaktest",
Expand Down
106 changes: 0 additions & 106 deletions pkg/ccl/changefeedccl/cdceval/constraint.go

This file was deleted.

93 changes: 56 additions & 37 deletions pkg/ccl/changefeedccl/cdceval/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,47 +11,66 @@ package cdceval
/***
cdceval package is a library for evaluating various expressions in CDC.
Namely, this package concerns itself with 3 things:
* Filter evaluation -- aka predicates: does the event match boolean expression.
* Projection evaluation: given the set of projection expressions, evaluate them.
* (Soon to be added) Evaluation of computed virtual columns.
Evaluator is the gateway into the evaluation logic; it has 3 methods matching
the above use cases. Before filtering and projection can be used, Evaluator must
be configured with appropriate predicate and filtering expressions via configureProjection.
If the Evaluator is not configured with configureProjection, then each event is assumed
to match filter by default, and projection operation is an identity operation returning input
row.
Evaluator constructs a helper structure (exprEval) to perform actual evaluation.
One exprEval exists per cdcevent.EventDescriptor (currently, a new exprEval created
whenever event descriptor changes; we might have to add some sort of caching if needed).
Evaluation of projections and filter expressions are identical.
First, we have "compilation" phase:
1. Expression is "walked" to resolve the names and replace those names with tree.IndexedVar expressions.
The "index" part of the indexed var refers to the "index" of the datum in the row.
(note however: the row is abstracted under cdcevent package). IndexedVar allows the value of that
variable to be bound later once it is known; it also associates the type information
with that variable.
2. Expression is typed check to ensure that it is of the appropriate type.
* Projection expressions can be of tree.Any type, while filters must be tree.DBool.
3. Expression is then normalized (constants folded, etc).
It is an error to have a filter expression which evaluates to "false" -- in this case, Evaluator
will return a "contradiction" error.
After expressions "compiled", they can be evaluated; and again, both projections and filters use the same
logic (evalExpr() function); basically, all IndexedVars are bound to the Datums in the updated row, and the
expression is evaluated to the appropriate target type.
The expression evaluation and execution is integrated with planner/distSQL.
First, when starting changefeed, CDC expression is planned and normalized
(NormalizeAndPlan). The normalization step involves various sanity checks
(ensuring for example that expression does not target multiple column familes).
The expression is planned as well because this step, by leveraging optimizer,
may simplify expressions, and, more importantly, can detect if the expression
will not match any rows (this results in an error being returned).
The normalized expression is persisted into job record.
When changefeed job starts running, it once again plans expression execution.
Part of the planning stage figures out which spans need to be scanned. If the
predicate restricted primary index span, then we will scan only portion
of the table.
Then, once aggregators start up, they will once again plan the expression
(sql.PlanCDCExpression), but this time each incoming KV event will be evaluated
by DistSQL to produce final result (projection).
PlanCDCExpression fully integrates with optimizer, and produces a plan
that can then be used to execute the "flow" via normal
distSQL mechanism (PlanAndRun).
What makes CDC expression execution different is that CDC is responsible for
pushing the data into the execution pipeline. This is accomplished via
execinfra.RowReceiver which is returned as part of the plan.
CDC will receive rows (encoded datums) from rangefeed, and then "pushes" those
rows into execution pipeline.
CDC then reads the resulting projection via distSQL result writer.
Evaluator is the gateway into the evaluation logic; it takes care of running
execution flow, caching (to reuse the same plan as long as the descriptor version
does not change), etc.
Expressions can contain functions. We restrict the set of functions that can be used by CDC.
Volatile functions, window functions, aggregate functions are disallowed.
Certain stable functions (s.a. now(), current_timestamp(), etc) are allowed -- they will always
return the MVCC timestamp of the event.
We also provide custom, CDC specific functions, such as cdc_prev() which returns prevoius row as
a JSONB record. See functions.go for more details.
Access to the previous state of the row is accomplished via (typed) cdc_prev tuple.
This tuple can be used to build complex expressions around the previous state of the row:
SELECT * FROM foo WHERE status='active' AND cdc_prev.status='inactive'
During normalization stage, we determine if the expression has cdc_prev access.
If so, the expression is rewritten as:
SELECT ... FROM tbl, (SELECT ((crdb_internal.cdc_prev_row()).*)) AS cdc_prev
The crdb_internal.cdc_prev_row function is created to return a tuple based on
the previous table descriptor. Access to this function is arranged via custom
function resolver.
In addition, prior to evaluating CDC expression, the WHERE clause is rewritten as:
SELECT where, ... FROM tbl, ...
That is, WHERE clause is turned into a boolean datum. When projection results are
consumed, we can determine if the row ought to be filtered. This step is done to
ensure that we correctly release resources for each event -- even the ones that
are filtered out.
Virtual computed columns can be easily supported but currently are not.
To support virtual computed columns we must ensure that the expression in that
column references only the target changefeed column family.
***/
Loading

0 comments on commit b5cb990

Please sign in to comment.