From 4f0af41a763f24a95eb60a862f62225bef81bc40 Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Thu, 11 Aug 2022 16:56:45 +0100 Subject: [PATCH 1/2] backupccl: immediately process resume spans during backup Previously, we put resume spans returned from an export request back on the queue for processing. In a large cluster with a lot of work to do, this might result in the resume span being processed much later. This isn't great because (1) it means we don't get to take advantage of disk and block caching and (2) it means that the resume span has a smaller chance of ending up in the same SST as the original span. Release note: None --- pkg/ccl/backupccl/backup_processor.go | 345 +++++++++++++------------- 1 file changed, 175 insertions(+), 170 deletions(-) diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 0f6b11c3b970..4e46622bad4d 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -306,9 +306,11 @@ func runBackupProcessor( grp := ctxgroup.WithContext(ctx) // Start a goroutine that will then start a group of goroutines which each - // pull spans off of `todo` and send export requests. Any resume spans are put - // back on `todo`. Any returned SSTs are put on a `returnedSpansChan` to be routed - // to a buffered sink that merges them until they are large enough to flush. + // pull spans off of `todo` and send export requests. Any spans that encounter + // write intent errors during Export are put back on the todo queue for later + // processing. Any returned SSTs are put on a `returnedSpansChan` to be + // routed to a buffered sink that merges them until they are large enough to + // flush. grp.GoCtx(func(ctx context.Context) error { defer close(returnedSpansChan) // TODO(pbardea): Check to see if this benefits from any tuning (e.g. +1, or @@ -330,194 +332,197 @@ func runBackupProcessor( case <-ctxDone: return ctx.Err() case span := <-todo: - header := roachpb.Header{Timestamp: span.end} - - splitMidKey := splitKeysOnTimestamps.Get(&clusterSettings.SV) - // If we started splitting already, we must continue until we reach the end - // of split span. - if !span.firstKeyTS.IsEmpty() { - splitMidKey = true - } - - req := &roachpb.ExportRequest{ - RequestHeader: roachpb.RequestHeaderFromSpan(span.span), - ResumeKeyTS: span.firstKeyTS, - StartTime: span.start, - EnableTimeBoundIteratorOptimization: true, // NB: Must set for 22.1 compatibility. - MVCCFilter: spec.MVCCFilter, - TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV), - ReturnSST: true, - SplitMidKey: splitMidKey, - } + for len(span.span.Key) != 0 { + header := roachpb.Header{Timestamp: span.end} + + splitMidKey := splitKeysOnTimestamps.Get(&clusterSettings.SV) + // If we started splitting already, we must continue until we reach the end + // of split span. + if !span.firstKeyTS.IsEmpty() { + splitMidKey = true + } - // If we're doing re-attempts but are not yet in the priority regime, - // check to see if it is time to switch to priority. - if !priority && span.attempts > 0 { - // Check if this is starting a new pass and we should delay first. - // We're okay with delaying this worker until then since we assume any - // other work it could pull off the queue will likely want to delay to - // a similar or later time anyway. - if delay := delayPerAttmpt.Get(&clusterSettings.SV) - timeutil.Since(span.lastTried); delay > 0 { - timer.Reset(delay) - log.Infof(ctx, "waiting %s to start attempt %d of remaining spans", delay, span.attempts+1) - select { - case <-ctxDone: - return ctx.Err() - case <-timer.C: - timer.Read = true - } + req := &roachpb.ExportRequest{ + RequestHeader: roachpb.RequestHeaderFromSpan(span.span), + ResumeKeyTS: span.firstKeyTS, + StartTime: span.start, + EnableTimeBoundIteratorOptimization: true, // NB: Must set for 22.1 compatibility. + MVCCFilter: spec.MVCCFilter, + TargetFileSize: batcheval.ExportRequestTargetFileSize.Get(&clusterSettings.SV), + ReturnSST: true, + SplitMidKey: splitMidKey, } - priority = timeutil.Since(readTime) > priorityAfter.Get(&clusterSettings.SV) - } + // If we're doing re-attempts but are not yet in the priority regime, + // check to see if it is time to switch to priority. + if !priority && span.attempts > 0 { + // Check if this is starting a new pass and we should delay first. + // We're okay with delaying this worker until then since we assume any + // other work it could pull off the queue will likely want to delay to + // a similar or later time anyway. + if delay := delayPerAttmpt.Get(&clusterSettings.SV) - timeutil.Since(span.lastTried); delay > 0 { + timer.Reset(delay) + log.Infof(ctx, "waiting %s to start attempt %d of remaining spans", delay, span.attempts+1) + select { + case <-ctxDone: + return ctx.Err() + case <-timer.C: + timer.Read = true + } + } - if priority { - // This re-attempt is reading far enough in the past that we just want - // to abort any transactions it hits. - header.UserPriority = roachpb.MaxUserPriority - } else { - // On the initial attempt to export this span and re-attempts that are - // done while it is still less than the configured time above the read - // time, we set WaitPolicy to Error, so that the export will return an - // error to us instead of instead doing blocking wait if it hits any - // other txns. This lets us move on to other ranges we have to export, - // provide an indication of why we're blocked, etc instead and come - // back to this range later. - header.WaitPolicy = lock.WaitPolicy_Error - } + priority = timeutil.Since(readTime) > priorityAfter.Get(&clusterSettings.SV) + } - // We set the DistSender response target bytes field to a sentinel - // value. The sentinel value of 1 forces the ExportRequest to paginate - // after creating a single SST. - header.TargetBytes = 1 - admissionHeader := roachpb.AdmissionHeader{ - // Export requests are currently assigned NormalPri. - // - // TODO(dt): Consider linking this to/from the UserPriority field. - Priority: int32(admissionpb.BulkNormalPri), - CreateTime: timeutil.Now().UnixNano(), - Source: roachpb.AdmissionHeader_FROM_SQL, - NoMemoryReservedAtSource: true, - } - log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)", - span.span, span.attempts+1, header.UserPriority.String()) - var rawResp roachpb.Response - var pErr *roachpb.Error - requestSentAt := timeutil.Now() - exportRequestErr := contextutil.RunWithTimeout(ctx, - fmt.Sprintf("ExportRequest for span %s", span.span), - timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { - rawResp, pErr = kv.SendWrappedWithAdmission( - ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req) - if pErr != nil { - return pErr.GoError() - } - return nil - }) - if exportRequestErr != nil { - if intentErr, ok := pErr.GetDetail().(*roachpb.WriteIntentError); ok { - span.lastTried = timeutil.Now() - span.attempts++ - todo <- span - // TODO(dt): send a progress update to update job progress to note - // the intents being hit. - log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, intentErr.Error()) - continue + if priority { + // This re-attempt is reading far enough in the past that we just want + // to abort any transactions it hits. + header.UserPriority = roachpb.MaxUserPriority + } else { + // On the initial attempt to export this span and re-attempts that are + // done while it is still less than the configured time above the read + // time, we set WaitPolicy to Error, so that the export will return an + // error to us instead of instead doing blocking wait if it hits any + // other txns. This lets us move on to other ranges we have to export, + // provide an indication of why we're blocked, etc instead and come + // back to this range later. + header.WaitPolicy = lock.WaitPolicy_Error } - // TimeoutError improves the opaque `context deadline exceeded` error - // message so use that instead. - if errors.HasType(exportRequestErr, (*contextutil.TimeoutError)(nil)) { - return errors.Wrap(exportRequestErr, "export request timeout") + + // We set the DistSender response target bytes field to a sentinel + // value. The sentinel value of 1 forces the ExportRequest to paginate + // after creating a single SST. + header.TargetBytes = 1 + admissionHeader := roachpb.AdmissionHeader{ + // Export requests are currently assigned NormalPri. + // + // TODO(dt): Consider linking this to/from the UserPriority field. + Priority: int32(admissionpb.BulkNormalPri), + CreateTime: timeutil.Now().UnixNano(), + Source: roachpb.AdmissionHeader_FROM_SQL, + NoMemoryReservedAtSource: true, } - // BatchTimestampBeforeGCError is returned if the ExportRequest - // attempts to read below the range's GC threshold. - if batchTimestampBeforeGCError, ok := pErr.GetDetail().(*roachpb.BatchTimestampBeforeGCError); ok { - // If the range we are exporting is marked to be excluded from - // backup, it is safe to ignore the error. It is likely that the - // table has been configured with a low GC TTL, and so the data - // the backup is targeting has already been gc'ed. - if batchTimestampBeforeGCError.DataExcludedFromBackup { + log.VEventf(ctx, 1, "sending ExportRequest for span %s (attempt %d, priority %s)", + span.span, span.attempts+1, header.UserPriority.String()) + var rawResp roachpb.Response + var pErr *roachpb.Error + requestSentAt := timeutil.Now() + exportRequestErr := contextutil.RunWithTimeout(ctx, + fmt.Sprintf("ExportRequest for span %s", span.span), + timeoutPerAttempt.Get(&clusterSettings.SV), func(ctx context.Context) error { + rawResp, pErr = kv.SendWrappedWithAdmission( + ctx, flowCtx.Cfg.DB.NonTransactionalSender(), header, admissionHeader, req) + if pErr != nil { + return pErr.GoError() + } + return nil + }) + if exportRequestErr != nil { + if intentErr, ok := pErr.GetDetail().(*roachpb.WriteIntentError); ok { + span.lastTried = timeutil.Now() + span.attempts++ + todo <- span + // TODO(dt): send a progress update to update job progress to note + // the intents being hit. + log.VEventf(ctx, 1, "retrying ExportRequest for span %s; encountered WriteIntentError: %s", span.span, intentErr.Error()) + span = spanAndTime{} continue } + // TimeoutError improves the opaque `context deadline exceeded` error + // message so use that instead. + if errors.HasType(exportRequestErr, (*contextutil.TimeoutError)(nil)) { + return errors.Wrap(exportRequestErr, "export request timeout") + } + // BatchTimestampBeforeGCError is returned if the ExportRequest + // attempts to read below the range's GC threshold. + if batchTimestampBeforeGCError, ok := pErr.GetDetail().(*roachpb.BatchTimestampBeforeGCError); ok { + // If the range we are exporting is marked to be excluded from + // backup, it is safe to ignore the error. It is likely that the + // table has been configured with a low GC TTL, and so the data + // the backup is targeting has already been gc'ed. + if batchTimestampBeforeGCError.DataExcludedFromBackup { + span = spanAndTime{} + continue + } + } + return errors.Wrapf(exportRequestErr, "exporting %s", span.span) } - return errors.Wrapf(exportRequestErr, "exporting %s", span.span) - } - resp := rawResp.(*roachpb.ExportResponse) + resp := rawResp.(*roachpb.ExportResponse) - // If the reply has a resume span, put the remaining span on - // todo to be picked up again in the next round. - if resp.ResumeSpan != nil { - if !resp.ResumeSpan.Valid() { - return errors.Errorf("invalid resume span: %s", resp.ResumeSpan) - } + // If the reply has a resume span, we process it immediately. + var resumeSpan spanAndTime + if resp.ResumeSpan != nil { + if !resp.ResumeSpan.Valid() { + return errors.Errorf("invalid resume span: %s", resp.ResumeSpan) + } - resumeTS := hlc.Timestamp{} - // Taking resume timestamp from the last file of response since files must - // always be consecutive even if we currently expect only one. - if fileCount := len(resp.Files); fileCount > 0 { - resumeTS = resp.Files[fileCount-1].EndKeyTS - } - resumeSpan := spanAndTime{ - span: *resp.ResumeSpan, - firstKeyTS: resumeTS, - start: span.start, - end: span.end, - attempts: span.attempts, - lastTried: span.lastTried, + resumeTS := hlc.Timestamp{} + // Taking resume timestamp from the last file of response since files must + // always be consecutive even if we currently expect only one. + if fileCount := len(resp.Files); fileCount > 0 { + resumeTS = resp.Files[fileCount-1].EndKeyTS + } + resumeSpan = spanAndTime{ + span: *resp.ResumeSpan, + firstKeyTS: resumeTS, + start: span.start, + end: span.end, + attempts: span.attempts, + lastTried: span.lastTried, + } } - todo <- resumeSpan - } - if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { - if backupKnobs.RunAfterExportingSpanEntry != nil { - backupKnobs.RunAfterExportingSpanEntry(ctx, resp) + if backupKnobs, ok := flowCtx.TestingKnobs().BackupRestoreTestingKnobs.(*sql.BackupRestoreTestingKnobs); ok { + if backupKnobs.RunAfterExportingSpanEntry != nil { + backupKnobs.RunAfterExportingSpanEntry(ctx, resp) + } } - } - var completedSpans int32 - if resp.ResumeSpan == nil { - completedSpans = 1 - } - - if len(resp.Files) > 1 { - log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1") - } - - for i, file := range resp.Files { - entryCounts := countRows(file.Exported, spec.PKIDs) - - ret := exportedSpan{ - // BackupManifest_File just happens to contain the exact fields - // to store the metadata we need, but there's no actual File - // on-disk anywhere yet. - metadata: backuppb.BackupManifest_File{ - Span: file.Span, - Path: file.Path, - EntryCounts: entryCounts, - }, - dataSST: file.SST, - revStart: resp.StartTime, - atKeyBoundary: file.EndKeyTS.IsEmpty()} - if span.start != spec.BackupStartTime { - ret.metadata.StartTime = span.start - ret.metadata.EndTime = span.end - } - // If multiple files were returned for this span, only one -- the - // last -- should count as completing the requested span. - if i == len(resp.Files)-1 { - ret.completedSpans = completedSpans + var completedSpans int32 + if resp.ResumeSpan == nil { + completedSpans = 1 } - select { - case returnedSpansChan <- ret: - case <-ctxDone: - return ctx.Err() + + if len(resp.Files) > 1 { + log.Warning(ctx, "unexpected multi-file response using header.TargetBytes = 1") } - } - // Emit the stats for the processed ExportRequest. - recordExportStats(backupProcessorSpan, resp, timeutil.Since(requestSentAt)) + for i, file := range resp.Files { + entryCounts := countRows(file.Exported, spec.PKIDs) + + ret := exportedSpan{ + // BackupManifest_File just happens to contain the exact fields + // to store the metadata we need, but there's no actual File + // on-disk anywhere yet. + metadata: backuppb.BackupManifest_File{ + Span: file.Span, + Path: file.Path, + EntryCounts: entryCounts, + }, + dataSST: file.SST, + revStart: resp.StartTime, + atKeyBoundary: file.EndKeyTS.IsEmpty()} + if span.start != spec.BackupStartTime { + ret.metadata.StartTime = span.start + ret.metadata.EndTime = span.end + } + // If multiple files were returned for this span, only one -- the + // last -- should count as completing the requested span. + if i == len(resp.Files)-1 { + ret.completedSpans = completedSpans + } + select { + case returnedSpansChan <- ret: + case <-ctxDone: + return ctx.Err() + } + } + // Emit the stats for the processed ExportRequest. + recordExportStats(backupProcessorSpan, resp, timeutil.Since(requestSentAt)) + span = resumeSpan + } default: // No work left to do, so we can exit. Note that another worker could // still be running and may still push new work (a retry) on to todo but From e0ce0fc350009c4d7151977cf818ff3529be495e Mon Sep 17 00:00:00 2001 From: Chengxiong Ruan Date: Wed, 10 Aug 2022 15:43:06 -0400 Subject: [PATCH 2/2] sql: fix conflicting udf options validation This commit fixes a bug where function options can be conflicting with each other because we forgot to add option type names into the dedupe map. A helper is added to handle the validation. This commit also fixes another bug in ALTER FUNCTION where leakproof can be set for a non-immutable function. A helper is also added so the logic can be reused. Release note: None Release justification: Fixing a bug that function options can be conflicting with each other. Also Fixing a bug that leakproof can be set for non-immutable function with ALTER FUNCTION --- pkg/sql/alter_function.go | 14 ++-- pkg/sql/catalog/funcdesc/func_desc.go | 19 ++++- pkg/sql/catalog/funcdesc/func_desc_test.go | 2 +- pkg/sql/create_function.go | 17 ++-- pkg/sql/logictest/testdata/logic_test/udf | 33 +++++++- pkg/sql/opt/optbuilder/create_function.go | 10 +-- pkg/sql/sem/tree/BUILD.bazel | 1 + pkg/sql/sem/tree/udf.go | 47 +++++++++++ pkg/sql/sem/tree/udf_test.go | 92 ++++++++++++++++++++++ 9 files changed, 203 insertions(+), 32 deletions(-) create mode 100644 pkg/sql/sem/tree/udf_test.go diff --git a/pkg/sql/alter_function.go b/pkg/sql/alter_function.go index d452b2354388..36ee7e777edc 100644 --- a/pkg/sql/alter_function.go +++ b/pkg/sql/alter_function.go @@ -12,7 +12,6 @@ package sql import ( "context" - "reflect" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -64,19 +63,20 @@ func (n *alterFunctionOptionsNode) startExec(params runParams) error { // referenced by other objects. This is needed when want to allow function // references. Need to think about in what condition a function can be altered // or not. - options := make(map[string]struct{}) + if err := tree.ValidateFuncOptions(n.n.Options); err != nil { + return err + } for _, option := range n.n.Options { - optTypeName := reflect.TypeOf(option).Name() - if _, ok := options[optTypeName]; ok { - return pgerror.New(pgcode.Syntax, "conflicting or redundant options") - } // Note that language and function body cannot be altered, and it's blocked // from parser level with "common_func_opt_item" syntax. err := setFuncOption(params, fnDesc, option) if err != nil { return err } - options[optTypeName] = struct{}{} + } + + if err := funcdesc.CheckLeakProofVolatility(fnDesc); err != nil { + return err } return params.p.writeFuncSchemaChange(params.ctx, fnDesc) diff --git a/pkg/sql/catalog/funcdesc/func_desc.go b/pkg/sql/catalog/funcdesc/func_desc.go index b31f7ae4ed23..dd54520b199d 100644 --- a/pkg/sql/catalog/funcdesc/func_desc.go +++ b/pkg/sql/catalog/funcdesc/func_desc.go @@ -18,6 +18,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/catprivilege" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/parser" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" + "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/schemachanger/scpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/catid" @@ -198,9 +200,7 @@ func (desc *immutable) ValidateSelf(vea catalog.ValidationErrorAccumulator) { } } - if desc.LeakProof && desc.Volatility != catpb.Function_IMMUTABLE { - vea.Report(errors.AssertionFailedf("leakproof is set for non-immutable function")) - } + vea.Report(CheckLeakProofVolatility(desc)) for i, dep := range desc.DependedOnBy { if dep.ID == descpb.InvalidID { @@ -658,3 +658,16 @@ func UserDefinedFunctionOIDToID(oid oid.Oid) (descpb.ID, error) { func IsOIDUserDefinedFunc(oid oid.Oid) bool { return catid.IsOIDUserDefined(oid) } + +// CheckLeakProofVolatility returns an error when a function is defined as +// leakproof but not immutable. See more details in comments for volatility.V. +func CheckLeakProofVolatility(fn catalog.FunctionDescriptor) error { + if fn.GetLeakProof() && fn.GetVolatility() != catpb.Function_IMMUTABLE { + return pgerror.Newf( + pgcode.InvalidFunctionDefinition, + "cannot set leakproof on function with non-immutable volatility: %s", + fn.GetVolatility().String(), + ) + } + return nil +} diff --git a/pkg/sql/catalog/funcdesc/func_desc_test.go b/pkg/sql/catalog/funcdesc/func_desc_test.go index ed6f075ee2d6..2412347bf344 100644 --- a/pkg/sql/catalog/funcdesc/func_desc_test.go +++ b/pkg/sql/catalog/funcdesc/func_desc_test.go @@ -177,7 +177,7 @@ func TestValidateFuncDesc(t *testing.T) { }, }, { - "leakproof is set for non-immutable function", + "cannot set leakproof on function with non-immutable volatility: VOLATILE", descpb.FunctionDescriptor{ Name: "f", ID: funcDescID, diff --git a/pkg/sql/create_function.go b/pkg/sql/create_function.go index 28609f896c4c..4df4011ca439 100644 --- a/pkg/sql/create_function.go +++ b/pkg/sql/create_function.go @@ -91,12 +91,8 @@ func (n *createFunctionNode) createNewFunction( return err } } - if udfDesc.LeakProof && udfDesc.Volatility != catpb.Function_IMMUTABLE { - return pgerror.Newf( - pgcode.InvalidFunctionDefinition, - "cannot create leakproof function with non-immutable volatility: %s", - udfDesc.Volatility.String(), - ) + if err := funcdesc.CheckLeakProofVolatility(udfDesc); err != nil { + return err } if err := n.addUDFReferences(udfDesc, params); err != nil { @@ -167,12 +163,9 @@ func (n *createFunctionNode) replaceFunction(udfDesc *funcdesc.Mutable, params r return err } } - if udfDesc.LeakProof && udfDesc.Volatility != catpb.Function_IMMUTABLE { - return pgerror.Newf( - pgcode.InvalidFunctionDefinition, - "cannot create leakproof function with non-immutable volatility: %s", - udfDesc.Volatility.String(), - ) + + if err := funcdesc.CheckLeakProofVolatility(udfDesc); err != nil { + return err } // Removing all existing references before adding new references. diff --git a/pkg/sql/logictest/testdata/logic_test/udf b/pkg/sql/logictest/testdata/logic_test/udf index fa662480abe7..7dcaf71a7a8a 100644 --- a/pkg/sql/logictest/testdata/logic_test/udf +++ b/pkg/sql/logictest/testdata/logic_test/udf @@ -4,12 +4,36 @@ a INT PRIMARY KEY, b INT ) -statement error pq: cannot create leakproof function with non-immutable volatility: STABLE +statement error pq: cannot set leakproof on function with non-immutable volatility: STABLE CREATE FUNCTION f(a int) RETURNS INT LEAKPROOF STABLE LANGUAGE SQL AS 'SELECT 1' statement error pq: return type mismatch in function declared to return int\nDETAIL: Actual return type is string CREATE FUNCTION f() RETURNS INT IMMUTABLE LANGUAGE SQL AS $$ SELECT 'hello' $$ +statement error pq: STABLE: conflicting or redundant options +CREATE FUNCTION f() RETURNS INT IMMUTABLE STABLE LANGUAGE SQL AS $$ SELECT 1 $$; + +statement error pq: STRICT: conflicting or redundant options +CREATE FUNCTION f() RETURNS INT CALLED ON NULL INPUT STABLE STRICT LANGUAGE SQL AS $$ SELECT 1 $$; + +statement error pq: RETURNS NULL ON NULL INPUT: conflicting or redundant options +CREATE FUNCTION f() RETURNS INT CALLED ON NULL INPUT STABLE RETURNS NULL ON NULL INPUT LANGUAGE SQL AS $$ SELECT 1 $$; + +statement error pq: NOT LEAKPROOF: conflicting or redundant options +CREATE FUNCTION f() RETURNS INT LEAKPROOF NOT LEAKPROOF LANGUAGE SQL AS $$ SELECT 1 $$; + +statement error pq: AS \$\$ SELECT 2 \$\$: conflicting or redundant options +CREATE FUNCTION f() RETURNS INT IMMUTABLE LANGUAGE SQL AS $$ SELECT 1 $$ AS $$ SELECT 2 $$; + +statement error pq: LANGUAGE SQL: conflicting or redundant options +CREATE FUNCTION f() RETURNS INT IMMUTABLE LANGUAGE SQL LANGUAGE SQL AS $$ SELECT 1 $$; + +statement error pq: no language specified +CREATE FUNCTION f() RETURNS INT IMMUTABLE AS $$ SELECT 1 $$; + +statement error pq: no function body specified +CREATE FUNCTION f() RETURNS INT IMMUTABLE LANGUAGE SQL; + statement ok CREATE FUNCTION a(i INT) RETURNS INT LANGUAGE SQL AS 'SELECT i' @@ -1017,9 +1041,12 @@ CREATE FUNCTION public.f_test_alter_opt(IN INT8) SELECT 1; $$ -statement error pq: conflicting or redundant options +statement error pq: IMMUTABLE: conflicting or redundant options ALTER FUNCTION f_test_alter_opt IMMUTABLE IMMUTABLE +statement error pq: cannot set leakproof on function with non-immutable volatility: STABLE +ALTER FUNCTION f_test_alter_opt STABLE LEAKPROOF + statement ok ALTER FUNCTION f_test_alter_opt IMMUTABLE LEAKPROOF STRICT; @@ -1370,7 +1397,7 @@ CREATE OR REPLACE FUNCTION f_test_cor(a INT, b INT) RETURNS STRING IMMUTABLE LAN statement error pq: cannot change return type of existing function CREATE OR REPLACE FUNCTION f_test_cor(a INT, b INT) RETURNS SETOF INT IMMUTABLE LANGUAGE SQL AS $$ SELECT 1 $$; -statement error pq: cannot create leakproof function with non-immutable volatility: VOLATILE +statement error pq: cannot set leakproof on function with non-immutable volatility: VOLATILE CREATE OR REPLACE FUNCTION f_test_cor(a INT, b INT) RETURNS INT LEAKPROOF LANGUAGE SQL AS $$ SELECT 1 $$; query T diff --git a/pkg/sql/opt/optbuilder/create_function.go b/pkg/sql/opt/optbuilder/create_function.go index ccaa7b6efd6f..a05c3f8e2b28 100644 --- a/pkg/sql/opt/optbuilder/create_function.go +++ b/pkg/sql/opt/optbuilder/create_function.go @@ -11,8 +11,6 @@ package optbuilder import ( - "reflect" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/memo" @@ -61,16 +59,16 @@ func (b *Builder) buildCreateFunction(cf *tree.CreateFunction, inScope *scope) ( panic(unimplemented.New("CREATE FUNCTION sql_body", "CREATE FUNCTION...sql_body unimplemented")) } + if err := tree.ValidateFuncOptions(cf.Options); err != nil { + panic(err) + } + // Look for function body string from function options. // Note that function body can be an empty string. funcBodyFound := false languageFound := false var funcBodyStr string - options := make(map[string]struct{}) for _, option := range cf.Options { - if _, ok := options[reflect.TypeOf(option).Name()]; ok { - panic(pgerror.New(pgcode.Syntax, "conflicting or redundant options")) - } switch opt := option.(type) { case tree.FunctionBodyStr: funcBodyFound = true diff --git a/pkg/sql/sem/tree/BUILD.bazel b/pkg/sql/sem/tree/BUILD.bazel index e93dfb6f208f..a65b27d1ea8b 100644 --- a/pkg/sql/sem/tree/BUILD.bazel +++ b/pkg/sql/sem/tree/BUILD.bazel @@ -198,6 +198,7 @@ go_test( "type_check_test.go", "type_name_test.go", "typing_test.go", + "udf_test.go", "var_expr_test.go", ], data = glob(["testdata/**"]) + ["//pkg/sql/parser:sql.y"], diff --git a/pkg/sql/sem/tree/udf.go b/pkg/sql/sem/tree/udf.go index 8def87827e38..88f550de9c54 100644 --- a/pkg/sql/sem/tree/udf.go +++ b/pkg/sql/sem/tree/udf.go @@ -21,6 +21,11 @@ import ( "github.com/cockroachdb/errors" ) +// ErrConflictingFunctionOption indicates that there are conflicting or +// redundant function options from user input to either create or alter a +// function. +var ErrConflictingFunctionOption = pgerror.New(pgcode.Syntax, "conflicting or redundant options") + // FunctionName represent a function name in a UDF relevant statement, either // DDL or DML statement. Similar to TableName, it is constructed for incoming // SQL queries from an UnresolvedObjectName. @@ -501,3 +506,45 @@ func MaybeFailOnUDFUsage(expr TypedExpr) error { } return nil } + +// ValidateFuncOptions checks whether there are conflicting or redundant +// function options in the given slice. +func ValidateFuncOptions(options FunctionOptions) error { + var hasLang, hasBody, hasLeakProof, hasVolatility, hasNullInputBehavior bool + err := func(opt FunctionOption) error { + return errors.Wrapf(ErrConflictingFunctionOption, "%s", AsString(opt)) + } + for _, option := range options { + switch option.(type) { + case FunctionLanguage: + if hasLang { + return err(option) + } + hasLang = true + case FunctionBodyStr: + if hasBody { + return err(option) + } + hasBody = true + case FunctionLeakproof: + if hasLeakProof { + return err(option) + } + hasLeakProof = true + case FunctionVolatility: + if hasVolatility { + return err(option) + } + hasVolatility = true + case FunctionNullInputBehavior: + if hasNullInputBehavior { + return err(option) + } + hasNullInputBehavior = true + default: + return pgerror.Newf(pgcode.InvalidParameterValue, "unknown function option: ", AsString(option)) + } + } + + return nil +} diff --git a/pkg/sql/sem/tree/udf_test.go b/pkg/sql/sem/tree/udf_test.go new file mode 100644 index 000000000000..0a4d7c9642c9 --- /dev/null +++ b/pkg/sql/sem/tree/udf_test.go @@ -0,0 +1,92 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tree_test + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestConflictingFunctionOptions(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testCases := []struct { + testName string + options tree.FunctionOptions + expectedErr string + }{ + { + testName: "no conflict", + options: tree.FunctionOptions{ + tree.FunctionVolatile, tree.FunctionLeakproof(true), tree.FunctionCalledOnNullInput, tree.FunctionLangSQL, tree.FunctionBodyStr("hi"), + }, + expectedErr: "", + }, + { + testName: "volatility conflict", + options: tree.FunctionOptions{ + tree.FunctionVolatile, tree.FunctionStable, + }, + expectedErr: "STABLE: conflicting or redundant options", + }, + { + testName: "null input behavior conflict 1", + options: tree.FunctionOptions{ + tree.FunctionCalledOnNullInput, tree.FunctionReturnsNullOnNullInput, + }, + expectedErr: "RETURNS NULL ON NULL INPUT: conflicting or redundant options", + }, + { + testName: "null input behavior conflict 2", + options: tree.FunctionOptions{ + tree.FunctionCalledOnNullInput, tree.FunctionStrict, + }, + expectedErr: "STRICT: conflicting or redundant options", + }, + { + testName: "leakproof conflict", + options: tree.FunctionOptions{ + tree.FunctionLeakproof(true), tree.FunctionLeakproof(false), + }, + expectedErr: "NOT LEAKPROOF: conflicting or redundant options", + }, + { + testName: "language conflict", + options: tree.FunctionOptions{ + tree.FunctionLangSQL, tree.FunctionLangSQL, + }, + expectedErr: "LANGUAGE SQL: conflicting or redundant options", + }, + { + testName: "function body conflict", + options: tree.FunctionOptions{ + tree.FunctionBodyStr("queries"), tree.FunctionBodyStr("others"), + }, + expectedErr: "AS $$others$$: conflicting or redundant options", + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + err := tree.ValidateFuncOptions(tc.options) + if tc.expectedErr == "" { + require.NoError(t, err) + return + } + require.Equal(t, tc.expectedErr, err.Error()) + }) + } +}