From 6b2207e9dac82ed795cea453b77ffee8fbc5ce58 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Thu, 12 Jan 2023 16:11:57 -0500 Subject: [PATCH] changefeedccl: Add `changefeed_creation_timestamp` function Add `changefeed_creation_timestamp` function, which returns changefeed creation timestamp. Changefeed transformations restrict access to some of the standard functions, including `now()`. Without such function, it is difficult to express changefeeds that want to emit events restricted by time. This function makes it possible to do this. For example, to create a changefeed that emits events from the `accounts` table that `last_withdrawal` happen starting 12 hours ago, one could do: ``` CREATE CHANGEFEED ... AS SELECT * FROM accounts WHERE last_withdrawal > changefeed_creation_timestamp() - interval '12 hours' ``` Epic: CRDB-17161 Release note (enterprise change): Changefeed expressions support `changefeed_created_timestamp` function. --- pkg/ccl/changefeedccl/cdceval/expr_eval.go | 14 +- .../changefeedccl/cdceval/expr_eval_test.go | 3 +- pkg/ccl/changefeedccl/cdceval/functions.go | 139 +++++++++--------- .../changefeedccl/cdceval/functions_test.go | 127 ++++++++++------ pkg/ccl/changefeedccl/event_processing.go | 2 +- pkg/sql/distsql_plan_changefeed.go | 4 + 6 files changed, 172 insertions(+), 117 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval.go b/pkg/ccl/changefeedccl/cdceval/expr_eval.go index 12d32b37ed06..8e5073074b30 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -35,6 +36,7 @@ import ( type Evaluator struct { sc *tree.SelectClause + statementTS hlc.Timestamp // Execution context. execCfg *sql.ExecutorConfig user username.SQLUsername @@ -79,14 +81,16 @@ func NewEvaluator( execCfg *sql.ExecutorConfig, user username.SQLUsername, sd sessiondatapb.SessionData, -) (*Evaluator, error) { + statementTS hlc.Timestamp, +) *Evaluator { return &Evaluator{ sc: sc, execCfg: execCfg, user: user, sessionData: sd, + statementTS: statementTS, familyEval: make(map[descpb.FamilyID]*familyEvaluator, 1), // usually, just 1 family. - }, nil + } } // NewEvaluator constructs new familyEvaluator for changefeed expression. @@ -96,6 +100,7 @@ func newFamilyEvaluator( execCfg *sql.ExecutorConfig, user username.SQLUsername, sd sessiondatapb.SessionData, + statementTS hlc.Timestamp, ) *familyEvaluator { e := familyEvaluator{ targetFamilyID: targetFamilyID, @@ -107,7 +112,7 @@ func newFamilyEvaluator( }, rowCh: make(chan tree.Datums, 1), } - + e.rowEvalCtx.startTime = statementTS // Arrange to be notified when event does not match predicate. predicateAsProjection(e.norm) @@ -138,7 +143,7 @@ func (e *Evaluator) Eval( fe, ok := e.familyEval[updatedRow.FamilyID] if !ok { - fe = newFamilyEvaluator(e.sc, updatedRow.FamilyID, e.execCfg, e.user, e.sessionData) + fe = newFamilyEvaluator(e.sc, updatedRow.FamilyID, e.execCfg, e.user, e.sessionData, e.statementTS) e.familyEval[updatedRow.FamilyID] = fe } @@ -468,6 +473,7 @@ func (e *familyEvaluator) closeErr() error { // rowEvalContext represents the context needed to evaluate row expressions. type rowEvalContext struct { ctx context.Context + startTime hlc.Timestamp updatedRow cdcevent.Row } diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go index cc9ff4c7453f..4223e100b6f5 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval_test.go @@ -781,7 +781,8 @@ func newEvaluatorWithNormCheck( return nil, err } - return NewEvaluator(norm.SelectClause, execCfg, username.RootUserName(), defaultDBSessionData) + return NewEvaluator(norm.SelectClause, execCfg, username.RootUserName(), + defaultDBSessionData, hlc.Timestamp{}), nil } var defaultDBSessionData = sessiondatapb.SessionData{ diff --git a/pkg/ccl/changefeedccl/cdceval/functions.go b/pkg/ccl/changefeedccl/cdceval/functions.go index 103d805fc618..58c4c5fcd88e 100644 --- a/pkg/ccl/changefeedccl/cdceval/functions.go +++ b/pkg/ccl/changefeedccl/cdceval/functions.go @@ -56,12 +56,28 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{ // {statement,transaction}_timestamp functions can be supported given that we // set the statement and transaction timestamp to be equal to MVCC timestamp // of the event. However, we provide our own override which uses annotation to - // return the MVCC timestamp of the update. - "statement_timestamp": makeBuiltinOverride( - tree.FunDefs["statement_timestamp"], timestampBuiltinOverloads..., + // return the MVCC timestamp of the update. In addition, the custom + // implementation uses volatility.Volatile since doing so will cause optimizer + // to (constant) fold these functions during optimization step -- something we + // definitely don't want to do because we need to evaluate those functions for + // each event. + "statement_timestamp": cdcTimestampBuiltin( + "statement_timestamp", + "Returns MVCC timestamp of the event", + volatility.Volatile, + types.TimestampTZ, + func(rowEvalCtx *rowEvalContext) hlc.Timestamp { + return rowEvalCtx.updatedRow.MvccTimestamp + }, ), - "transaction_timestamp": makeBuiltinOverride( - tree.FunDefs["transaction_timestamp"], timestampBuiltinOverloads..., + "transaction_timestamp": cdcTimestampBuiltin( + "transaction_timestamp", + "Returns MVCC timestamp of the event", + volatility.Volatile, + types.TimestampTZ, + func(rowEvalCtx *rowEvalContext) hlc.Timestamp { + return rowEvalCtx.updatedRow.MvccTimestamp + }, ), "timezone": useDefaultBuiltin, @@ -74,6 +90,11 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{ //"st_asgeojson", //"st_estimatedextent", + // NB: even though some cdc functions appear to be stable (e.g. cdc_is_delete()), + // we should not mark custom CDC functions as stable. Doing so will cause + // optimizer to (constant) fold this function during optimization step -- something + // we definitely don't want to do because we need to evaluate those functions + // for each event. "cdc_is_delete": makeCDCBuiltIn( "cdc_is_delete", tree.Overload{ @@ -86,28 +107,36 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{ } return tree.DBoolFalse, nil }, - Info: "Returns true if the event is a deletion", - // NB: even though some cdc functions appear to be stable (e.g. cdc_is_delete()), - // we should not mark custom CDC functions as stable. Doing so will cause - // optimizer to (constant) fold this function during optimization step -- something - // we definitely don't want to do because we need to evaluate those functions - // for each event. + Info: "Returns true if the event is a deletion", Volatility: volatility.Volatile, }), "cdc_mvcc_timestamp": cdcTimestampBuiltin( "cdc_mvcc_timestamp", - "Returns event MVCC HLC timestamp", + "Returns MVCC timestamp of the event", + volatility.Volatile, + types.Decimal, func(rowEvalCtx *rowEvalContext) hlc.Timestamp { return rowEvalCtx.updatedRow.MvccTimestamp }, ), "cdc_updated_timestamp": cdcTimestampBuiltin( "cdc_updated_timestamp", - "Returns event updated HLC timestamp", + "Returns schema timestamp of the event", + volatility.Volatile, + types.Decimal, func(rowEvalCtx *rowEvalContext) hlc.Timestamp { return rowEvalCtx.updatedRow.SchemaTS }, ), + "changefeed_creation_timestamp": cdcTimestampBuiltin( + "changefeed_creation_timestamp", + "Returns changefeed creation time", + volatility.Stable, + types.Decimal, + func(rowEvalCtx *rowEvalContext) hlc.Timestamp { + return rowEvalCtx.startTime + }, + ), } const cdcFnCategory = "CDC builtin" @@ -125,7 +154,11 @@ func makeCDCBuiltIn(fnName string, overloads ...tree.Overload) *tree.ResolvedFun } func cdcTimestampBuiltin( - fnName string, doc string, tsFn func(rowEvalCtx *rowEvalContext) hlc.Timestamp, + fnName string, + doc string, + v volatility.V, + preferredOverloadReturnType *types.T, + tsFn func(rowEvalCtx *rowEvalContext) hlc.Timestamp, ) *tree.ResolvedFunctionDefinition { def := tree.NewFunctionDefinition( fnName, @@ -138,16 +171,35 @@ func cdcTimestampBuiltin( rowEvalCtx := rowEvalContextFromEvalContext(evalCtx) return eval.TimestampToDecimalDatum(tsFn(rowEvalCtx)), nil }, - Info: doc, - // NB: even though some cdc functions appear to be stable (e.g. cdc_is_delete()), - // we should not mark custom CDC functions as stable. Doing so will cause - // optimizer to (constant) fold this function during optimization step -- something - // we definitely don't want to do because we need to evaluate those functions - // for each event. - Volatility: volatility.Volatile, + Info: doc + " as HLC timestamp", + Volatility: v, + PreferredOverload: preferredOverloadReturnType == types.Decimal, + }, + { + Types: tree.ParamTypes{}, + ReturnType: tree.FixedReturnType(types.TimestampTZ), + Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { + rowEvalCtx := rowEvalContextFromEvalContext(evalCtx) + return tree.MakeDTimestampTZ(tsFn(rowEvalCtx).GoTime(), time.Microsecond) + }, + Info: doc + " as TIMESTAMPTZ", + Volatility: v, + PreferredOverload: preferredOverloadReturnType == types.TimestampTZ, + }, + { + Types: tree.ParamTypes{}, + ReturnType: tree.FixedReturnType(types.Timestamp), + Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { + rowEvalCtx := rowEvalContextFromEvalContext(evalCtx) + return tree.MakeDTimestamp(tsFn(rowEvalCtx).GoTime(), time.Microsecond) + }, + Info: doc + " as TIMESTAMP", + Volatility: v, + PreferredOverload: preferredOverloadReturnType == types.Timestamp, }, }, ) + // The schema name is actually not important since CDC doesn't use any user // defined functions. And, we're sure that we always return the first // function definition found. @@ -185,51 +237,6 @@ func TestingDisableFunctionsBlacklist() func() { } } -// For some functions (specifically the volatile ones), we do -// not want to use the provided builtin. Instead, we opt for -// our own function definition. -func makeBuiltinOverride( - builtin *tree.FunctionDefinition, overloads ...tree.Overload, -) *tree.ResolvedFunctionDefinition { - props := builtin.FunctionProperties - override := tree.NewFunctionDefinition(builtin.Name, &props, overloads) - // The schema name is actually not important since CDC doesn't use any user - // defined functions. And, we're sure that we always return the first - // function definition found. - return tree.QualifyBuiltinFunctionDefinition(override, catconstants.PublicSchemaName) -} - -// tree.Overload definitions for statement_timestamp and transaction_timestamp functions. -var timestampBuiltinOverloads = []tree.Overload{ - { - Types: tree.ParamTypes{}, - ReturnType: tree.FixedReturnType(types.TimestampTZ), - PreferredOverload: true, - Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { - rowEvalCtx := rowEvalContextFromEvalContext(evalCtx) - return tree.MakeDTimestampTZ(rowEvalCtx.updatedRow.MvccTimestamp.GoTime(), time.Microsecond) - }, - Info: "Returns MVCC timestamp of the event", - // NB: Default builtin implementation uses volatility.Stable - // We override volatility to be Volatile so that function - // is not folded. - Volatility: volatility.Volatile, - }, - { - Types: tree.ParamTypes{}, - ReturnType: tree.FixedReturnType(types.Timestamp), - Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { - rowEvalCtx := rowEvalContextFromEvalContext(evalCtx) - return tree.MakeDTimestamp(rowEvalCtx.updatedRow.MvccTimestamp.GoTime(), time.Microsecond) - }, - Info: "Returns MVCC timestamp of the event", - // NB: Default builtin implementation uses volatility.Stable - // We override volatility to be Volatile so that function - // is not folded. - Volatility: volatility.Volatile, - }, -} - var functionDenyList = make(map[string]struct{}) func init() { diff --git a/pkg/ccl/changefeedccl/cdceval/functions_test.go b/pkg/ccl/changefeedccl/cdceval/functions_test.go index 5e1d4ff88076..9683cfa1bff2 100644 --- a/pkg/ccl/changefeedccl/cdceval/functions_test.go +++ b/pkg/ccl/changefeedccl/cdceval/functions_test.go @@ -53,45 +53,106 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { defer configSemaForCDC(&semaCtx)() t.Run("time", func(t *testing.T) { - // We'll run tests against some future time stamp to ensure - // that time functions use correct values. - futureTS := s.Clock().Now().Add(int64(60*time.Minute), 0) - expectTSTZ := func(ts hlc.Timestamp) string { + expectTSTZ := func(ts hlc.Timestamp) tree.Datum { t.Helper() d, err := tree.MakeDTimestampTZ(ts.GoTime(), time.Microsecond) require.NoError(t, err) - return tree.AsStringWithFlags(d, tree.FmtExport) + return d + } + expectTS := func(ts hlc.Timestamp) tree.Datum { + t.Helper() + d, err := tree.MakeDTimestamp(ts.GoTime(), time.Microsecond) + require.NoError(t, err) + return d + } + expectHLC := func(ts hlc.Timestamp) tree.Datum { + t.Helper() + return eval.TimestampToDecimalDatum(ts) } - for _, tc := range []struct { - fn string - expect string - }{ - {fn: "statement_timestamp", expect: expectTSTZ(futureTS)}, - {fn: "transaction_timestamp", expect: expectTSTZ(futureTS)}, + type preferredFn func(ts hlc.Timestamp) tree.Datum + for fn, preferredOverload := range map[string]preferredFn{ + "statement_timestamp": expectTSTZ, + "transaction_timestamp": expectTSTZ, + "cdc_mvcc_timestamp": expectHLC, + "cdc_updated_timestamp": expectHLC, + "changefeed_creation_timestamp": expectHLC, } { - t.Run(tc.fn, func(t *testing.T) { - testRow := makeEventRow(t, desc, s.Clock().Now(), false, futureTS) + t.Run(fn, func(t *testing.T) { + createTS := s.Clock().Now().Add(-int64(60*time.Minute), 0) + schemaTS := s.Clock().Now() + rowTS := schemaTS.Add(int64(60*time.Minute), 0) + + targetTS := rowTS + switch fn { + case "cdc_updated_timestamp": + targetTS = schemaTS + case "changefeed_creation_timestamp": + targetTS = createTS + } + // We'll run tests against some future time stamp to ensure + // that time functions use correct values. + testRow := makeEventRow(t, desc, schemaTS, false, rowTS) e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, - fmt.Sprintf("SELECT %s() FROM foo", tc.fn)) + fmt.Sprintf("SELECT "+ + "%[1]s() AS preferred,"+ // Preferred overload. + "%[1]s():::TIMESTAMPTZ AS tstz,"+ // Force timestamptz overload. + "%[1]s():::TIMESTAMP AS ts,"+ // Force timestamp overload. + "%[1]s():::DECIMAL AS dec,"+ // Force decimal overload. + "%[1]s()::STRING AS str"+ // Casts preferred overload to string. + " FROM foo", fn)) require.NoError(t, err) defer e.Close() + e.statementTS = createTS p, err := e.Eval(ctx, testRow, cdcevent.Row{}) require.NoError(t, err) - require.Equal(t, map[string]string{tc.fn: tc.expect}, slurpValues(t, p)) - // Emit again, this time advancing MVCC timestamp of the row. - // We want to make sure that optimizer did not constant fold the call - // to the function, even though this function is marked stable. + initialExpectations := map[string]string{ + "preferred": tree.AsStringWithFlags(preferredOverload(targetTS), tree.FmtExport), + "tstz": tree.AsStringWithFlags(expectTSTZ(targetTS), tree.FmtExport), + "ts": tree.AsStringWithFlags(expectTS(targetTS), tree.FmtExport), + "dec": tree.AsStringWithFlags(eval.TimestampToDecimalDatum(targetTS), tree.FmtExport), + "str": tree.AsStringWithFlags(preferredOverload(targetTS), tree.FmtExport), + } + require.Equal(t, initialExpectations, slurpValues(t, p)) + + // Modify row/schema timestamps, and evaluate again. testRow.MvccTimestamp = testRow.MvccTimestamp.Add(int64(time.Hour), 0) + targetTS = testRow.MvccTimestamp + testRow.SchemaTS = schemaTS.Add(1, 0) + e.statementTS = e.statementTS.Add(-1, 0) p, err = e.Eval(ctx, testRow, cdcevent.Row{}) require.NoError(t, err) - require.Equal(t, map[string]string{tc.fn: expectTSTZ(testRow.MvccTimestamp)}, slurpValues(t, p)) + + var updatedExpectations map[string]string + switch fn { + case "changefeed_creation_timestamp": + // this function is stable; So, advancing evaluator timestamp + // should have no bearing on the returned values -- we should see + // the same thing we saw before. + updatedExpectations = initialExpectations + case "cdc_updated_timestamp": + targetTS = testRow.SchemaTS + fallthrough + default: + updatedExpectations = map[string]string{ + "preferred": tree.AsStringWithFlags(preferredOverload(targetTS), tree.FmtExport), + "tstz": tree.AsStringWithFlags(expectTSTZ(targetTS), tree.FmtExport), + "ts": tree.AsStringWithFlags(expectTS(targetTS), tree.FmtExport), + "dec": tree.AsStringWithFlags(eval.TimestampToDecimalDatum(targetTS), tree.FmtExport), + "str": tree.AsStringWithFlags(preferredOverload(targetTS), tree.FmtExport), + } + } + require.Equal(t, updatedExpectations, slurpValues(t, p)) }) } t.Run("timezone", func(t *testing.T) { + // We'll run tests against some future time stamp to ensure + // that time functions use correct values. + futureTS := s.Clock().Now().Add(int64(60*time.Minute), 0) + // Timezone has many overrides, some are immutable, and some are Stable. // Call "stable" overload which relies on session data containing // timezone. Since we don't do any special setup with session data, the @@ -138,31 +199,6 @@ func TestEvaluatesCDCFunctionOverloads(t *testing.T) { return j } - t.Run("cdc_{mvcc,updated}_timestamp", func(t *testing.T) { - for _, cast := range []string{"", "::decimal", "::string"} { - t.Run(cast, func(t *testing.T) { - schemaTS := s.Clock().Now() - mvccTS := schemaTS.Add(int64(30*time.Minute), 0) - testRow := makeEventRow(t, desc, schemaTS, false, mvccTS) - e, err := newEvaluator(&execCfg, &semaCtx, testRow.EventDescriptor, fmt.Sprintf( - "SELECT cdc_mvcc_timestamp()%[1]s as mvcc, cdc_updated_timestamp()%[1]s as updated FROM foo", cast, - )) - require.NoError(t, err) - defer e.Close() - - p, err := e.Eval(ctx, testRow, cdcevent.Row{}) - require.NoError(t, err) - require.Equal(t, - map[string]string{ - "mvcc": eval.TimestampToDecimalDatum(mvccTS).String(), - "updated": eval.TimestampToDecimalDatum(schemaTS).String(), - }, - slurpValues(t, p), - ) - }) - } - }) - t.Run("pg_collation_for", func(t *testing.T) { testRow := makeEventRow(t, desc, s.Clock().Now(), false, s.Clock().Now()) @@ -346,5 +382,6 @@ func newEvaluator( if err != nil { return nil, err } - return NewEvaluator(norm.SelectClause, execCfg, username.RootUserName(), defaultDBSessionData) + return NewEvaluator(norm.SelectClause, execCfg, username.RootUserName(), + defaultDBSessionData, execCfg.Clock.Now()), nil } diff --git a/pkg/ccl/changefeedccl/event_processing.go b/pkg/ccl/changefeedccl/event_processing.go index 627da022f07b..ef44236c3976 100644 --- a/pkg/ccl/changefeedccl/event_processing.go +++ b/pkg/ccl/changefeedccl/event_processing.go @@ -290,7 +290,7 @@ func newEvaluator( sd = *spec.Feed.SessionData } - return cdceval.NewEvaluator(sc, cfg, spec.User(), sd) + return cdceval.NewEvaluator(sc, cfg, spec.User(), sd, spec.Feed.StatementTime), nil } func (c *kvEventToRowConsumer) topicForEvent(eventMeta cdcevent.Metadata) (TopicDescriptor, error) { diff --git a/pkg/sql/distsql_plan_changefeed.go b/pkg/sql/distsql_plan_changefeed.go index 88c549fe99b0..0f1c0981f523 100644 --- a/pkg/sql/distsql_plan_changefeed.go +++ b/pkg/sql/distsql_plan_changefeed.go @@ -95,13 +95,17 @@ func PlanCDCExpression( if err != nil { return cdcPlan, err } + cdcCat := &cdcOptCatalog{ optCatalog: opc.catalog.(*optCatalog), cdcConfig: cfg, targetFamilyID: familyID, semaCtx: &p.semaCtx, } + + // Reset catalog to cdc specific implementation. opc.catalog = cdcCat + opc.optimizer.Init(ctx, p.EvalContext(), opc.catalog) memo, err := opc.buildExecMemo(ctx) if err != nil {