Skip to content

Commit

Permalink
changefeedccl: add high-level recovers in cdceval
Browse files Browse the repository at this point in the history
SQL is complicated. If we hit a panic while parsing or evaluating SQL,
we need to fail the changefeed, not crash the node.

Informs #90416.

Release note (sql change): Fixed a bug that could cause crashes when parsing malformed changefeed expressions.
  • Loading branch information
HonoreDB committed Oct 25, 2022
1 parent 71beb3a commit e1d2103
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 4 deletions.
21 changes: 18 additions & 3 deletions pkg/ccl/changefeedccl/cdceval/expr_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,12 @@ func (e *Evaluator) ComputeVirtualColumns(ctx context.Context, row *cdcevent.Row
// MatchesFilter returns true if row matches evaluator filter expression.
func (e *Evaluator) MatchesFilter(
ctx context.Context, updatedRow cdcevent.Row, mvccTS hlc.Timestamp, prevRow cdcevent.Row,
) (bool, error) {
) (_ bool, err error) {
defer func() {
if pan := recover(); pan != nil {
err = errors.Newf("error while evaluating WHERE clause: %s", pan)
}
}()
if e.where == nil {
return true, nil
}
Expand All @@ -86,7 +91,12 @@ func (e *Evaluator) MatchesFilter(
// Returns cdcevent.Row representing evalProjection.
func (e *Evaluator) Projection(
ctx context.Context, updatedRow cdcevent.Row, mvccTS hlc.Timestamp, prevRow cdcevent.Row,
) (cdcevent.Row, error) {
) (_ cdcevent.Row, err error) {
defer func() {
if pan := recover(); pan != nil {
err = errors.Newf("error while evaluating SELECT clause: %s", pan)
}
}()
if len(e.selectors) == 0 {
return updatedRow, nil
}
Expand All @@ -99,7 +109,12 @@ func (e *Evaluator) Projection(
}

// initSelectClause configures this evaluator to evaluate specified select clause.
func (e *Evaluator) initSelectClause(ctx context.Context, sc *tree.SelectClause) error {
func (e *Evaluator) initSelectClause(ctx context.Context, sc *tree.SelectClause) (err error) {
defer func() {
if pan := recover(); pan != nil {
err = errors.Newf("error while validating CHANGEFEED expression: %s", pan)
}
}()
if len(sc.Exprs) == 0 { // Shouldn't happen, but be defensive.
return pgerror.New(pgcode.InvalidParameterValue,
"expected at least 1 projection")
Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/cdceval/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,10 @@ func makeStringSet(vals ...string) map[string]struct{} {
}
return m
}

// TestingEnableVolatileFunction allows functions with the given name (lowercase)
// to be used in expressions if their volatility level would disallow them by default.
// Used for testing.
func TestingEnableVolatileFunction(fnName string) {
supportedVolatileBuiltinFunctions[fnName] = struct{}{}
}
7 changes: 6 additions & 1 deletion pkg/ccl/changefeedccl/cdceval/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,12 @@ func NormalizeAndValidateSelectForTarget(
sc *tree.SelectClause,
includeVirtual bool,
splitColFams bool,
) (n NormalizedSelectClause, _ jobspb.ChangefeedTargetSpecification, _ error) {
) (n NormalizedSelectClause, _ jobspb.ChangefeedTargetSpecification, retErr error) {
defer func() {
if pan := recover(); pan != nil {
retErr = errors.Newf("low-level error while normalizing expression, probably syntax is unsupported in CREATE CHANGEFEED: %s", pan)
}
}()
execCtx.SemaCtx()
execCfg := execCtx.ExecCfg()
if !execCfg.Settings.Version.IsActive(ctx, clusterversion.EnablePredicateProjectionChangefeed) {
Expand Down
49 changes: 49 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4443,6 +4443,55 @@ func TestChangefeedDescription(t *testing.T) {

}

func TestChangefeedPanicRecovery(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
// Panics can mess with the test setup so run these each in their own test.

prep := func(t *testing.T, sqlDB *sqlutils.SQLRunner) {
cdceval.TestingEnableVolatileFunction(`crdb_internal.force_panic`)
sqlDB.Exec(t, `CREATE TABLE foo(id int primary key, s string)`)
sqlDB.Exec(t, `INSERT INTO foo(id, s) VALUES (0, 'hello'), (1, null)`)
}

cdcTest(t, func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
prep(t, sqlDB)
// Check that disallowed expressions have a good error message.
// Also regression test for https://github.com/cockroachdb/cockroach/issues/90416
sqlDB.ExpectErr(t, "syntax is unsupported in CREATE CHANGEFEED",
`CREATE CHANGEFEED WITH schema_change_policy='stop' AS SELECT 1 FROM foo WHERE EXISTS (SELECT true)`)
})

// Check that all panics while evaluating the WHERE clause in an expression are recovered from.
cdcTest(t, func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
prep(t, sqlDB)
foo := feed(t, f,
`CREATE CHANGEFEED WITH schema_change_policy='stop' AS SELECT 1 FROM foo WHERE crdb_internal.force_panic('wat') IS NULL`)
defer closeFeed(t, foo)
var err error
for err == nil {
_, err = foo.Next()
}
require.Error(t, err, "error while evaluating WHERE clause")
})

// Check that all panics while evaluating the SELECT clause in an expression are recovered from.
cdcTest(t, func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)
prep(t, sqlDB)
foo := feed(t, f,
`CREATE CHANGEFEED WITH schema_change_policy='stop' AS SELECT crdb_internal.force_panic('wat') FROM foo`)
defer closeFeed(t, foo)
var err error
for err == nil {
_, err = foo.Next()
}
require.Error(t, err, "error while evaluating SELECT clause")
})
}

func TestChangefeedPauseUnpause(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit e1d2103

Please sign in to comment.