Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feedback request for providing configurable UDF functions #10744

Closed
Omega359 opened this issue May 31, 2024 · 15 comments
Closed

Feedback request for providing configurable UDF functions #10744

Omega359 opened this issue May 31, 2024 · 15 comments
Labels
enhancement New feature or request

Comments

@Omega359
Copy link
Contributor

Omega359 commented May 31, 2024

Is your feature request related to a problem or challenge?

During work on adding a 'safe' mode to to_timestamp and to_date UDF functions I've come across an issue that I would like feedback on before proceeding.

The feature

Currently for timestamp and date parsing if a source string cannot be parsed using any of the provided chrono formats datafusion will return an error. This is normal for a database-type solution however it is not ideal for a system that is parsing billions of dates in batches - some of which are human entered. Systems such as Spark default to a null value for anything that cannot be parsed and this feature enables a mode ('safe' to mirror the name and same behaviour as CastOptions) that allows the to_timestamp* and to_date UDFs to have the same behaviour (return null on error).

The problem

Since UDF functions have no context provided and there isn't a way I know of to statically get access to config to add the above mentioned functionality I resorted to using a new constructor function to allow the UDF to switch behaviour:

#[derive(Debug)]
pub struct ToTimestampFunc {
    signature: Signature,
    /// how to handle cast or parsing failures, either return NULL (safe=true) or return ERR (safe=false)
    pub safe: bool,
}

impl ToTimestampFunc {
    pub fn new() -> Self {
        Self {
            signature: Signature::variadic_any(Volatility::Immutable),
            safe: false,
        }
    }

    pub fn new_with_safe(safe: bool) -> Self {
        Self {
            signature: Signature::variadic_any(Volatility::Immutable),
            safe,
        }
    }
}

To use the alternative 'safe' mode for these functions is as simple as

let to_timestamp = ToTimestampFunc::new_with_safe(true);
session_context.register_udf(ScalarUDF::new_from_impl(to_timestamp));

Unfortunately this only affect sql queries - any calls to the to_timestamp(args: Vec) function will not use the new definition as registered in the function registry. This is because that function and every other function like it use a static singleton instance that only uses a ::new() call to initial it and there is no way that I can see to replace that instance.

Describe the solution you'd like

I see a few possible solutions to this:

  • Acknowledge the difference in behaviour in documentation and recommend using ctx.udf("to_timestamp").unwrap().call(args) instead of the to_timestamp() function anytime 'safe' mode is required. This is less than ideal imho as it can lead to confusion and unintuitive behavior.
  • Provide a mechanism (function argument to invoke perhaps) to provide a SessionContext to all UDF's so that they can adjust behaviour based on configuration at call time.
  • Update the to_timestamp() function and all similar functions to have alternatives that do not use the statically defined version of the UDF but rather create it on-demand or use the definition saved in the session state. As examples:
    // the existing function    
    pub fn to_timestamp(args: Vec<Expr>) -> Expr {
        super::to_timestamp().call(args)
    }

    pub fn to_timestamp_safe(args: Vec<Expr>) -> Expr {
        ScalarUDF::new_from_impl(ToTimestampFunc::new_with_safe(true)).call(args)
    }

    // or
    pub fn to_timestamp_from_context(ctx &Arc<SessionContext>, args: Vec<Expr>) -> Expr {
        ctx.udf("to_timestamp").unwrap().call(args)
    }

   // varargs would have been a really nice solution as well but 🤷 
  • something else I haven't thought of.

Any opinions, suggestions and critiques would be welcome.

Describe alternatives you've considered

No response

Additional context

No response

@Omega359 Omega359 added the enhancement New feature or request label May 31, 2024
@Omega359 Omega359 changed the title Switch UDF function expression definitions from static instance calls to use function_registry for lookup Feedback request for providing configurable UDF functions May 31, 2024
@alamb
Copy link
Contributor

alamb commented May 31, 2024

I think options 1 and 3 would be straightforward

You could even potentially implement

pub fn to_timestamp_safe(args: Vec<Expr>) -> Expr {
...
}

Directly in your application (rather than in the core of datafusion)

Another crazy thought might be to implement a rewrite pass (e.g. AnalyzerRule) that rewrites all expressions in the plan when safe mode is needed... I think they have access to all the state necessary

@alamb
Copy link
Contributor

alamb commented May 31, 2024

I think the key thing to figure out is "will safemode to_timestamp be part of the datafusion core"?

Maybe it is time to make a datafusion-functions-scalar-spark or something that has functions that have the spark behaviors 🤔

@jayzhan211
Copy link
Contributor

jayzhan211 commented Jun 1, 2024

I think it is possible to extend the safe_mode to the datafusion core like what you mentioned, it should be similar to the distinct mode to the aggregate function. We can have different behavior based on whether safe_mode is set.

For the expression API, we can either

  1. Introduce to_timestamp_safe()
  2. Extend to_timestamp(args, safe: bool)
  3. Introduce builder mode to avoid breaking change to_timestamp(args).safe().build()

I prefer the third one.

Also, there are many to_timestamp functions with different time units. I'm not sure why they are split into different functions, they are possible to collapse into a single to_timestamp function and cast to different time units based on the given argument.

We can have to_timestamp(args).time_unit(Mili).safe().build() if we need the timestamp millisecond and to_timestamp(args).safe().build() for Nanosecond

"will safemode to_timestamp be part of the datafusion core"

It is an interesting question, we can think of implementing functions based on other DB in the first place.

For example, we usually follow postgres, duckdb, and others.

We can have functions-postgres, functions-duckdb and functions-spark that aim to mirror the behavior of other db, and we don't even need functions crate (we can keep them for backward compatibility). They are considered as the extension crate implemented by the third party, datafusion does not need to implement any datafusion-specific function (and we prefer not to), we just make sure datafusion core is possible compatible with different functions crate. And we can register most-used functions to the datafusion for the end-to-end SQL workflow!

@jayzhan211
Copy link
Contributor

We can have functions-postgres, functions-duckdb and functions-spark

Most of the function has the same behavior in different db, we can also implement different functions in one crate functions, but we register the expected functions based on the DB we want to mimic. Instead of having a default function for datafusion SQL workflow, we can have spark_functions, and postgres_functions that register different to_timestamp functions based on the configuration we set.

@Omega359
Copy link
Contributor Author

Omega359 commented Jun 1, 2024

We can have functions-postgres, functions-duckdb and functions-spark

Most of the function has the same behavior in different db, we can also implement different functions in one crate functions, but we register the expected functions based on the DB we want to mimic. Instead of having a default function for datafusion SQL workflow, we can have spark_functions, and postgres_functions that register different to_timestamp functions based on the configuration we set.

@andygrove are there udf's already in the comet project that handle spark specific behaviour? If so is that a separate project or embedded in comet currently? (I haven't looked at that codebase myself since the initial drop)

@Omega359
Copy link
Contributor Author

Omega359 commented Jun 3, 2024

The one issue with moving this functionality into a spark module is that for that to really be valid the formats would have to be spark compatible, which they are not currently. I do not have the spare time in the near future to implement a parser to do that.

@andygrove
Copy link
Member

andygrove commented Jun 4, 2024

@Omega359 so far we have been implementing custom PhysicalExpr directly in the datafusion-comet project as needed for Spark-specific behavior, with support for Spark's different evaluation modes (legacy, try, ansi) and we are using fuzz testing to ensure compatibilty across multiple Spark versions.

I think we need to have the discussion of whether it makes sense to upstream these into the core datafusion project or not, or whether we publish a datafusion-spark-compat crate from Comet, or some other option.

@andygrove
Copy link
Member

The one issue with moving this functionality into a spark module is that for that to really be valid the formats would have to be spark compatible, which they are not currently. I do not have the spare time in the near future to implement a parser to do that.

We are porting Spark parsing logic as part of Comet.

@Omega359
Copy link
Contributor Author

Omega359 commented Jun 4, 2024

I think we need to have the discussion of whether it makes sense to upstream these into the core datafusion project or not, or whether we publish a datafusion-spark-compat crate from Comet, or some other option.

Thank you for chiming in. While I wouldn't mind spark compatibility it really isn't the focus of this request as I've already converted all the spark expressions and function usages to DF compatible ones. It's the general system behaviour that is what I would like to address - being able to essentially switch from a db focused perspective (fail fast) to a processing engine one (nominally lenient - return null) for some (all) of the UDF's.

If the general consensus is to separate out this desired behaviour than I would think a separate crate might be the best approach. However from searching the issues here there seems to have been some talk of how to handle mirroring the behaviour of other databases in the past but it also includes sql syntax as well so it's not quite as simple as just having a db specific crate full of UDF's and calling it a day.

@andygrove
Copy link
Member

andygrove commented Jun 5, 2024

I have read the context now and understand that this is about safe mode or what Spark calls ANSI mode.

Isn't this just a case of adding a new flag to the session context that UDFs can choose to use when deciding whether to return null or throw an error?

@Omega359
Copy link
Contributor Author

Omega359 commented Jun 5, 2024

I have read the context now and understand that this is about safe mode or what Spark calls ANSI mode.

Isn't this just a case of adding a new flag to the session context that UDFs can choose to use when deciding whether to return null or throw an error?

That would be nice ... except UDF's don't have a way to access the session context currently :( Option #2 and #3 provide that via different mechanisms.

@alamb
Copy link
Contributor

alamb commented Jun 6, 2024

I wonder if we could take a page from what @jayzhan211 is implementing in #10560 and go with a trait

So we could implement something like

let expr = to_timestamp(lit("2021-01-01"))
  // set the to_timestamp mode to "safe" 
  .safe();

I realize that this would require changing the callsites so maybe it isn't viable

@Omega359
Copy link
Contributor Author

Omega359 commented Jun 7, 2024

After thinking about this a fair bit the builder approach like what @jayzhan211 did with aggregate functions seems to be the best way forward on this feature imho. While I do like the idea of a separate crate(s) for mirroring functionality from other systems I think that is a much much larger project and is encompasses a lot more functionality than this specific feature entails. Putting this feature into core I don't believe limits DF in the future to extracting out this and other similar behaviour 'traits' and functionality to system specific crates.

I'll start work on this and see how that works out. If it does then I'll add safe support via a trait to the to_timestamp*, to_date and to_unixtime functions. If there are other UDF's that could benefit from having a 'safe' mode (return null on error) please let me know and I'll see about adding safe mode to those as well.

Thank you everyone for your feedback and guidance on this feedback request! 👍

@alamb
Copy link
Contributor

alamb commented Jun 9, 2024

We just merged the aggregate builder in #10560 -- I am quite happy with how it turned out, in case you want to take a friendly look

@Omega359
Copy link
Contributor Author

Omega359 commented Jun 10, 2024

After attempting to implement the builder approach it became apparent to me that it will touch too many things and really won't work well without changing the signature of ScalarUDFImpl anyways. It works for the aggregate functions because the functions defined in the AggregateUDFImpl trait have arguments where the additional information (distinct, sort, ordering, etc) is provided to the UDF implementation. In the case of ScalarUDFImpl though that is not the case.

After some more thought I think the cleanest approach may be to add a get_config_options function to the SimplifyInfo trait and add a scalar_udf_safe_mode: bool, default = false to the ExecutionOptions struct. Doing that will allow functions that require configuration (including but obviously not limited to the 'safe' mode I'm working on) to access them while changing as little as possible wrt trait signatures.

err, scratch that. Onto the next idea :/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants