diff --git a/pkg/ccl/changefeedccl/cdceval/expr_eval.go b/pkg/ccl/changefeedccl/cdceval/expr_eval.go index f4238779222c..73cd97a7530e 100644 --- a/pkg/ccl/changefeedccl/cdceval/expr_eval.go +++ b/pkg/ccl/changefeedccl/cdceval/expr_eval.go @@ -70,7 +70,12 @@ func (e *Evaluator) ComputeVirtualColumns(ctx context.Context, row *cdcevent.Row // MatchesFilter returns true if row matches evaluator filter expression. func (e *Evaluator) MatchesFilter( ctx context.Context, updatedRow cdcevent.Row, mvccTS hlc.Timestamp, prevRow cdcevent.Row, -) (bool, error) { +) (_ bool, err error) { + defer func() { + if pan := recover(); pan != nil { + err = errors.Newf("error while evaluating WHERE clause: %s", pan) + } + }() if e.where == nil { return true, nil } @@ -88,7 +93,12 @@ func (e *Evaluator) MatchesFilter( // Returns cdcevent.Row representing evalProjection. func (e *Evaluator) Projection( ctx context.Context, updatedRow cdcevent.Row, mvccTS hlc.Timestamp, prevRow cdcevent.Row, -) (cdcevent.Row, error) { +) (_ cdcevent.Row, err error) { + defer func() { + if pan := recover(); pan != nil { + err = errors.Newf("error while evaluating SELECT clause: %s", pan) + } + }() if len(e.selectors) == 0 { return updatedRow, nil } @@ -101,7 +111,12 @@ func (e *Evaluator) Projection( } // initSelectClause configures this evaluator to evaluate specified select clause. -func (e *Evaluator) initSelectClause(ctx context.Context, sc *tree.SelectClause) error { +func (e *Evaluator) initSelectClause(ctx context.Context, sc *tree.SelectClause) (err error) { + defer func() { + if pan := recover(); pan != nil { + err = errors.Newf("error while validating CHANGEFEED expression: %s", pan) + } + }() if len(sc.Exprs) == 0 { // Shouldn't happen, but be defensive. return pgerror.New(pgcode.InvalidParameterValue, "expected at least 1 projection") diff --git a/pkg/ccl/changefeedccl/cdceval/functions.go b/pkg/ccl/changefeedccl/cdceval/functions.go index 5683420de07f..f3045124953b 100644 --- a/pkg/ccl/changefeedccl/cdceval/functions.go +++ b/pkg/ccl/changefeedccl/cdceval/functions.go @@ -181,3 +181,10 @@ func makeStringSet(vals ...string) map[string]struct{} { } return m } + +// TestingEnableVolatileFunction allows functions with the given name (lowercase) +// to be used in expressions if their volatility level would disallow them by default. +// Used for testing. +func TestingEnableVolatileFunction(fnName string) { + supportedVolatileBuiltinFunctions[fnName] = struct{}{} +} diff --git a/pkg/ccl/changefeedccl/cdceval/validation.go b/pkg/ccl/changefeedccl/cdceval/validation.go index adee55be841f..931208ebdc8f 100644 --- a/pkg/ccl/changefeedccl/cdceval/validation.go +++ b/pkg/ccl/changefeedccl/cdceval/validation.go @@ -43,7 +43,12 @@ func NormalizeAndValidateSelectForTarget( sc *tree.SelectClause, includeVirtual bool, splitColFams bool, -) (n NormalizedSelectClause, _ jobspb.ChangefeedTargetSpecification, _ error) { +) (n NormalizedSelectClause, _ jobspb.ChangefeedTargetSpecification, retErr error) { + defer func() { + if pan := recover(); pan != nil { + retErr = errors.Newf("low-level error while normalizing expression, probably syntax is unsupported in CREATE CHANGEFEED: %s", pan) + } + }() execCtx.SemaCtx() execCfg := execCtx.ExecCfg() if !execCfg.Settings.Version.IsActive(ctx, clusterversion.EnablePredicateProjectionChangefeed) { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 2ec90541a50d..77756b32a1c0 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -4522,6 +4522,55 @@ func TestChangefeedDescription(t *testing.T) { } +func TestChangefeedPanicRecovery(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + // Panics can mess with the test setup so run these each in their own test. + + prep := func(t *testing.T, sqlDB *sqlutils.SQLRunner) { + cdceval.TestingEnableVolatileFunction(`crdb_internal.force_panic`) + sqlDB.Exec(t, `CREATE TABLE foo(id int primary key, s string)`) + sqlDB.Exec(t, `INSERT INTO foo(id, s) VALUES (0, 'hello'), (1, null)`) + } + + cdcTest(t, func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + prep(t, sqlDB) + // Check that disallowed expressions have a good error message. + // Also regression test for https://github.com/cockroachdb/cockroach/issues/90416 + sqlDB.ExpectErr(t, "syntax is unsupported in CREATE CHANGEFEED", + `CREATE CHANGEFEED WITH schema_change_policy='stop' AS SELECT 1 FROM foo WHERE EXISTS (SELECT true)`) + }) + + // Check that all panics while evaluating the WHERE clause in an expression are recovered from. + cdcTest(t, func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + prep(t, sqlDB) + foo := feed(t, f, + `CREATE CHANGEFEED WITH schema_change_policy='stop' AS SELECT 1 FROM foo WHERE crdb_internal.force_panic('wat') IS NULL`) + defer closeFeed(t, foo) + var err error + for err == nil { + _, err = foo.Next() + } + require.Error(t, err, "error while evaluating WHERE clause") + }) + + // Check that all panics while evaluating the SELECT clause in an expression are recovered from. + cdcTest(t, func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + sqlDB := sqlutils.MakeSQLRunner(s.DB) + prep(t, sqlDB) + foo := feed(t, f, + `CREATE CHANGEFEED WITH schema_change_policy='stop' AS SELECT crdb_internal.force_panic('wat') FROM foo`) + defer closeFeed(t, foo) + var err error + for err == nil { + _, err = foo.Next() + } + require.Error(t, err, "error while evaluating SELECT clause") + }) +} + func TestChangefeedPauseUnpause(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)