Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: Add changefeed_creation_timestamp function #95179

Merged
merged 1 commit into from
Jan 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions pkg/ccl/changefeedccl/cdceval/expr_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
Expand All @@ -35,6 +36,7 @@ import (
type Evaluator struct {
sc *tree.SelectClause

statementTS hlc.Timestamp
// Execution context.
execCfg *sql.ExecutorConfig
user username.SQLUsername
Expand Down Expand Up @@ -79,14 +81,16 @@ func NewEvaluator(
execCfg *sql.ExecutorConfig,
user username.SQLUsername,
sd sessiondatapb.SessionData,
) (*Evaluator, error) {
statementTS hlc.Timestamp,
) *Evaluator {
return &Evaluator{
sc: sc,
execCfg: execCfg,
user: user,
sessionData: sd,
statementTS: statementTS,
familyEval: make(map[descpb.FamilyID]*familyEvaluator, 1), // usually, just 1 family.
}, nil
}
}

// NewEvaluator constructs new familyEvaluator for changefeed expression.
Expand All @@ -96,6 +100,7 @@ func newFamilyEvaluator(
execCfg *sql.ExecutorConfig,
user username.SQLUsername,
sd sessiondatapb.SessionData,
statementTS hlc.Timestamp,
) *familyEvaluator {
e := familyEvaluator{
targetFamilyID: targetFamilyID,
Expand All @@ -107,7 +112,7 @@ func newFamilyEvaluator(
},
rowCh: make(chan tree.Datums, 1),
}

e.rowEvalCtx.startTime = statementTS
// Arrange to be notified when event does not match predicate.
predicateAsProjection(e.norm)

Expand Down Expand Up @@ -138,7 +143,7 @@ func (e *Evaluator) Eval(

fe, ok := e.familyEval[updatedRow.FamilyID]
if !ok {
fe = newFamilyEvaluator(e.sc, updatedRow.FamilyID, e.execCfg, e.user, e.sessionData)
fe = newFamilyEvaluator(e.sc, updatedRow.FamilyID, e.execCfg, e.user, e.sessionData, e.statementTS)
e.familyEval[updatedRow.FamilyID] = fe
}

Expand Down Expand Up @@ -468,6 +473,7 @@ func (e *familyEvaluator) closeErr() error {
// rowEvalContext represents the context needed to evaluate row expressions.
type rowEvalContext struct {
ctx context.Context
startTime hlc.Timestamp
updatedRow cdcevent.Row
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/cdceval/expr_eval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -781,7 +781,8 @@ func newEvaluatorWithNormCheck(
return nil, err
}

return NewEvaluator(norm.SelectClause, execCfg, username.RootUserName(), defaultDBSessionData)
return NewEvaluator(norm.SelectClause, execCfg, username.RootUserName(),
defaultDBSessionData, hlc.Timestamp{}), nil
}

var defaultDBSessionData = sessiondatapb.SessionData{
Expand Down
139 changes: 73 additions & 66 deletions pkg/ccl/changefeedccl/cdceval/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,28 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{
// {statement,transaction}_timestamp functions can be supported given that we
// set the statement and transaction timestamp to be equal to MVCC timestamp
// of the event. However, we provide our own override which uses annotation to
// return the MVCC timestamp of the update.
"statement_timestamp": makeBuiltinOverride(
tree.FunDefs["statement_timestamp"], timestampBuiltinOverloads...,
// return the MVCC timestamp of the update. In addition, the custom
// implementation uses volatility.Volatile since doing so will cause optimizer
// to (constant) fold these functions during optimization step -- something we
// definitely don't want to do because we need to evaluate those functions for
// each event.
"statement_timestamp": cdcTimestampBuiltin(
"statement_timestamp",
"Returns MVCC timestamp of the event",
volatility.Volatile,
types.TimestampTZ,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
return rowEvalCtx.updatedRow.MvccTimestamp
},
),
"transaction_timestamp": makeBuiltinOverride(
tree.FunDefs["transaction_timestamp"], timestampBuiltinOverloads...,
"transaction_timestamp": cdcTimestampBuiltin(
"transaction_timestamp",
"Returns MVCC timestamp of the event",
volatility.Volatile,
types.TimestampTZ,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
return rowEvalCtx.updatedRow.MvccTimestamp
},
),

"timezone": useDefaultBuiltin,
Expand All @@ -74,6 +90,11 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{
//"st_asgeojson",
//"st_estimatedextent",

// NB: even though some cdc functions appear to be stable (e.g. cdc_is_delete()),
// we should not mark custom CDC functions as stable. Doing so will cause
// optimizer to (constant) fold this function during optimization step -- something
// we definitely don't want to do because we need to evaluate those functions
// for each event.
"cdc_is_delete": makeCDCBuiltIn(
"cdc_is_delete",
tree.Overload{
Expand All @@ -86,28 +107,36 @@ var cdcFunctions = map[string]*tree.ResolvedFunctionDefinition{
}
return tree.DBoolFalse, nil
},
Info: "Returns true if the event is a deletion",
// NB: even though some cdc functions appear to be stable (e.g. cdc_is_delete()),
// we should not mark custom CDC functions as stable. Doing so will cause
// optimizer to (constant) fold this function during optimization step -- something
// we definitely don't want to do because we need to evaluate those functions
// for each event.
Info: "Returns true if the event is a deletion",
Volatility: volatility.Volatile,
}),
"cdc_mvcc_timestamp": cdcTimestampBuiltin(
"cdc_mvcc_timestamp",
"Returns event MVCC HLC timestamp",
"Returns MVCC timestamp of the event",
volatility.Volatile,
types.Decimal,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
return rowEvalCtx.updatedRow.MvccTimestamp
},
),
"cdc_updated_timestamp": cdcTimestampBuiltin(
"cdc_updated_timestamp",
"Returns event updated HLC timestamp",
"Returns schema timestamp of the event",
volatility.Volatile,
types.Decimal,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
return rowEvalCtx.updatedRow.SchemaTS
},
),
"changefeed_creation_timestamp": cdcTimestampBuiltin(
"changefeed_creation_timestamp",
"Returns changefeed creation time",
volatility.Stable,
types.Decimal,
func(rowEvalCtx *rowEvalContext) hlc.Timestamp {
return rowEvalCtx.startTime
},
),
}

const cdcFnCategory = "CDC builtin"
Expand All @@ -125,7 +154,11 @@ func makeCDCBuiltIn(fnName string, overloads ...tree.Overload) *tree.ResolvedFun
}

func cdcTimestampBuiltin(
fnName string, doc string, tsFn func(rowEvalCtx *rowEvalContext) hlc.Timestamp,
fnName string,
doc string,
v volatility.V,
preferredOverloadReturnType *types.T,
tsFn func(rowEvalCtx *rowEvalContext) hlc.Timestamp,
) *tree.ResolvedFunctionDefinition {
def := tree.NewFunctionDefinition(
fnName,
Expand All @@ -138,16 +171,35 @@ func cdcTimestampBuiltin(
rowEvalCtx := rowEvalContextFromEvalContext(evalCtx)
return eval.TimestampToDecimalDatum(tsFn(rowEvalCtx)), nil
},
Info: doc,
// NB: even though some cdc functions appear to be stable (e.g. cdc_is_delete()),
// we should not mark custom CDC functions as stable. Doing so will cause
// optimizer to (constant) fold this function during optimization step -- something
// we definitely don't want to do because we need to evaluate those functions
// for each event.
Volatility: volatility.Volatile,
Info: doc + " as HLC timestamp",
Volatility: v,
PreferredOverload: preferredOverloadReturnType == types.Decimal,
},
{
Types: tree.ParamTypes{},
ReturnType: tree.FixedReturnType(types.TimestampTZ),
Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
rowEvalCtx := rowEvalContextFromEvalContext(evalCtx)
return tree.MakeDTimestampTZ(tsFn(rowEvalCtx).GoTime(), time.Microsecond)
},
Info: doc + " as TIMESTAMPTZ",
Volatility: v,
PreferredOverload: preferredOverloadReturnType == types.TimestampTZ,
},
{
Types: tree.ParamTypes{},
ReturnType: tree.FixedReturnType(types.Timestamp),
Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
rowEvalCtx := rowEvalContextFromEvalContext(evalCtx)
return tree.MakeDTimestamp(tsFn(rowEvalCtx).GoTime(), time.Microsecond)
},
Info: doc + " as TIMESTAMP",
Volatility: v,
PreferredOverload: preferredOverloadReturnType == types.Timestamp,
},
},
)

// The schema name is actually not important since CDC doesn't use any user
// defined functions. And, we're sure that we always return the first
// function definition found.
Expand Down Expand Up @@ -185,51 +237,6 @@ func TestingDisableFunctionsBlacklist() func() {
}
}

// For some functions (specifically the volatile ones), we do
// not want to use the provided builtin. Instead, we opt for
// our own function definition.
func makeBuiltinOverride(
builtin *tree.FunctionDefinition, overloads ...tree.Overload,
) *tree.ResolvedFunctionDefinition {
props := builtin.FunctionProperties
override := tree.NewFunctionDefinition(builtin.Name, &props, overloads)
// The schema name is actually not important since CDC doesn't use any user
// defined functions. And, we're sure that we always return the first
// function definition found.
return tree.QualifyBuiltinFunctionDefinition(override, catconstants.PublicSchemaName)
}

// tree.Overload definitions for statement_timestamp and transaction_timestamp functions.
var timestampBuiltinOverloads = []tree.Overload{
{
Types: tree.ParamTypes{},
ReturnType: tree.FixedReturnType(types.TimestampTZ),
PreferredOverload: true,
Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
rowEvalCtx := rowEvalContextFromEvalContext(evalCtx)
return tree.MakeDTimestampTZ(rowEvalCtx.updatedRow.MvccTimestamp.GoTime(), time.Microsecond)
},
Info: "Returns MVCC timestamp of the event",
// NB: Default builtin implementation uses volatility.Stable
// We override volatility to be Volatile so that function
// is not folded.
Volatility: volatility.Volatile,
},
{
Types: tree.ParamTypes{},
ReturnType: tree.FixedReturnType(types.Timestamp),
Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
rowEvalCtx := rowEvalContextFromEvalContext(evalCtx)
return tree.MakeDTimestamp(rowEvalCtx.updatedRow.MvccTimestamp.GoTime(), time.Microsecond)
},
Info: "Returns MVCC timestamp of the event",
// NB: Default builtin implementation uses volatility.Stable
// We override volatility to be Volatile so that function
// is not folded.
Volatility: volatility.Volatile,
},
}

var functionDenyList = make(map[string]struct{})

func init() {
Expand Down
Loading