Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Snowpark Operator that allows execution of Snowflake Snowpark code #24456

Closed
1 of 2 tasks
sfc-gh-madkins opened this issue Jun 14, 2022 · 47 comments · Fixed by #42457
Closed
1 of 2 tasks

Add Snowpark Operator that allows execution of Snowflake Snowpark code #24456

sfc-gh-madkins opened this issue Jun 14, 2022 · 47 comments · Fixed by #42457
Assignees
Labels
area:providers good first issue kind:feature Feature Requests provider:snowflake Issues related to Snowflake provider

Comments

@sfc-gh-madkins
Copy link
Contributor

Description

A Snowpark operator (similar to the Snowflake Operator) that executes Snowpark for Python Code. Credentials are handled the same way for Snowflake Operator. Code is executed the same ways to the Python Operator.

Use case/motivation

I would like to be able to to schedule snowflake-snowpark-python code that I have developed. Instead of having to write lengthy SQL statements, I would prefer to develop in my language of choice.

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@sfc-gh-madkins sfc-gh-madkins added the kind:feature Feature Requests label Jun 14, 2022
@boring-cyborg
Copy link

boring-cyborg bot commented Jun 14, 2022

Thanks for opening your first issue here! Be sure to follow the issue template!

@potiuk
Copy link
Member

potiuk commented Jun 15, 2022

Since you are from Snowflake @sfc-gh-madkins - woudl you like to contribute it ? We are generally moving into direction where stakeholders will get more responsibility for adding and maintaining their providers (when there are strong stakeholders behind an operator) , so there will be more and more an expectation that stakeholders in a provider with submit and maintain more changes to their providers https://lists.apache.org/thread/6ngq79df7op541gfwntspdtsvzlv1cr6 (anyhow - otherwise it will have to wait until someone will pick it up - if someone from Snowflake won't).

@eladkal eladkal added area:providers provider:snowflake Issues related to Snowflake provider labels Jun 15, 2022
@mik-laj
Copy link
Member

mik-laj commented Jun 15, 2022

I would like to work on this.

Disclaimer: I work for Snowflake (@sfc-gh-kbregula), but I contribute to Airflow during non-business hours.

@sfc-gh-madkins
Copy link
Contributor Author

@turbaszek

@sfc-gh-madkins
Copy link
Contributor Author

@turbaszek @mik-laj This vision I had for this was to smash together the python operator with our snowflake hook -- so in theory ppl get the same experience of authenticating as they do with the snowflake operator and the same experience with the python operator by providing a "Snowpark" callable -- that passes in the authenticates session variable

@mik-laj
Copy link
Member

mik-laj commented Jun 21, 2022

I thought to just add Snowpark session creation support to the SnowflakeHook class. If the user uses the new Task Flow API, we will have a very simple and readable code.

    @task()
    def get_count():
        session = SnowflakeHook(
            snowflake_conn_id="conn-id"
        ).get_snowpark_session()
        return session.sql("SELECT count(*) FROM sample_product_data").collect()

@sfc-gh-madkins
Copy link
Contributor Author

sfc-gh-madkins commented Jun 21, 2022 via email

@turbaszek
Copy link
Member

How about creating @snowpark_task decorator as we do for virtualenv and docker?

def docker_task(
python_callable: Optional[Callable] = None,
multiple_outputs: Optional[bool] = None,
**kwargs,
) -> "TaskDecorator":

def virtualenv_task(
python_callable: Optional[Callable] = None,
multiple_outputs: Optional[bool] = None,
**kwargs,
) -> TaskDecorator:

Of course such operator would have to provide session:

@snowpark_task
def count_user(session: Session):  # session is required parameter
    session.sql(...)

Personally I found this more friendly than instantiating hooks or using operators.

Apart from that should we consider implementing custom XCom backend based on reading query results using query-id? This can be a really nice addition to Snowpark + Airflow combo.

@sfc-gh-madkins
Copy link
Contributor Author

sfc-gh-madkins commented Jun 22, 2022 via email

@mik-laj
Copy link
Member

mik-laj commented Jun 22, 2022

We can add support for the optional snowflke_conn_id paarameter.

@snowpark_task(snowflke_conn_id='my-favourite-conn')
def count_user(session: Session):  # session is required parameter
    session.sql(...)

The official documentation recommends that custom decorators should be called task.CUSTOM, so we should introduce one change to @turbaszek's proposal

-@snowpark_task
+@task.snowpark

@sfc-gh-madkins
Copy link
Contributor Author

sfc-gh-madkins commented Jun 22, 2022 via email

@turbaszek
Copy link
Member

Good catch @mik-laj with the task.name convention!

@mik-laj
Copy link
Member

mik-laj commented Jun 25, 2022

I prepared a draft PR: #24652
The implementation is complete, but I will want to add some documentation to improve the adoption of this feature and system tests to avoid regression.

@sfc-gh-madkins
Copy link
Contributor Author

sfc-gh-madkins commented Jun 25, 2022 via email

@turbaszek
Copy link
Member

I pulled the PR -- any advice on how to let me test locally? Doesnt appear to be as easy as pip install -e

I would recommend using breeze - Airflow development environment:
https://github.com/apache/airflow/blob/main/BREEZE.rst#id26

@mik-laj
Copy link
Member

mik-laj commented Jun 26, 2022

@sfc-gh-madkins I wonder if I should set QUERY_TAG and pass the following values:

Does Snowflake have any recommendations for ISV on how to use the query_tag that we can use here? Unfortunately, the public documentation is quite poor on the description of this parameter and I do not know the best practices on how it should be used.

I also wonder what to do to make Snowflake customers know about this integration. Currently, support for Snowflake in Airflow is not mentioned in the end-customer Snowflake documentation. For example, we can add Apache Airflow to the ecosystem section.
There is also integration for Apache Beam added, but under the commercial name - Google Dataflow, so adding Apache Airflow under the commercial name - Cloud Composer is one solution.
Screenshot 2022-06-26 at 14 22 46
But Apache Airflow has more service providers, e.g. Amazon Managed Workflows for Apache Airflow (MWAA) or Astronomer. In addition, Apache Beam can also be run on-premises when using Apache Spark runner instead of Google Dataflow Runner, which also allows us to use SnowflakeIO for Apache Beam without Google Dataflow.

For comparison, dbt has explicitly described Apache Airflow, and they even published a guide on how to use Apache Airflow with dbt cloud.
https://docs.getdbt.com/guides/orchestration/airflow-and-dbt-cloud/1-airflow-and-dbt-cloud
https://docs.getdbt.com/docs/running-a-dbt-project/running-dbt-in-production

CC: @sfc-gh-pgancz, @sfc-gh-sleslie

@sfc-gh-madkins
Copy link
Contributor Author

sfc-gh-madkins commented Jul 6, 2022 via email

@mik-laj
Copy link
Member

mik-laj commented Jul 6, 2022

@sfc-gh-madkins I still have to add documentation and system tests.

@sfc-gh-madkins
Copy link
Contributor Author

sfc-gh-madkins commented Jul 11, 2022 via email

@potiuk
Copy link
Member

potiuk commented Jul 11, 2022

For providers - roughly monthly, I am planning to release new providers this week so if it is not going to be merged tomorrow/maybe day after, it will have to wait for next month.

@sfc-gh-madkins
Copy link
Contributor Author

sfc-gh-madkins commented Aug 2, 2022 via email

@sfc-gh-madkins
Copy link
Contributor Author

sfc-gh-madkins commented Aug 3, 2022 via email

@mik-laj
Copy link
Member

mik-laj commented Aug 3, 2022

Yesterday I was working on system tests. Now only the documentation remains, which I also hope to prepare within a week.

Would a customer be willing to test this PR before it is released by Apache? I can prepare a whl package that the user will have to install explicitly.

@sfc-gh-madkins
Copy link
Contributor Author

sfc-gh-madkins commented Aug 3, 2022 via email

@sfc-gh-madkins
Copy link
Contributor Author

sfc-gh-madkins commented Aug 16, 2022 via email

@mik-laj
Copy link
Member

mik-laj commented Aug 17, 2022

I still need to check it out, but for now I don't see any reasons why it should work on older versions of Apache Airflow.

@potiuk
Copy link
Member

potiuk commented Aug 17, 2022

Just to explain the process @sfc-gh-madkins (@mik-laj already knows most of it) - but I think it would be good that you understand this and can arrange your process with your customers as well, so that there are no surprises and so that you can adapt to the rules we agreed in the community.

The rule we have for Airflow providers is that they should be 2.2+ compatible till 11th of October:
https://github.com/apache/airflow/blob/main/README.md#release-process-for-providers :

For example this means that by default we upgrade the minimum version of Airflow supported by providers to 2.3.0 in the first Provider's release after 11th of October 2022 (11th of October 2021 is the date when the first PATCHLEVEL of 2.2 (2.2.0) has been released.

We are attempting to test the 2.2 compatibility basically with every PR - at laast to the level of whether the provider "imports" well on airflow 2.2: https://github.com/apache/airflow/runs/7872249712?check_suite_focus=true . The API (not yet formalized but soon will be when we split providers out to separate repos) between the Providers and Airflow are rather stable ones and we are very carefully reviewing any parts that might break potentially. But accidental breakages happen (which we fix if we find them). Ideally, those errors are detected during manual testing of RC candidates and for that we kindly ask people employed by the stakeholders and those who are involved in preparing changes to test it when we release RC candidates. There are at least 72 hours to test the releases and you are (including yourself @sfc-gh-madkins) heavily encouraged to run those tests when we release the provider. But involving your customer is also encouraged.

We announce it at the devlist (roughly monthly) with some early announcements that we are about to prepare the new wave of providers and we create an issue like this one when RC candidates are ready, where you can coment and raise issues or confirm that things work as expected (the latter is very encouraged :): #25640 when we prepare RC candidates. The last one is a good example where Databricks employees tested the Databricks provider and found out a slight Airlfow 2.2 compatibility issue (so we have RC4 candidate of databricks now with it fixed). You can see the discussion to see how it can play out.

So - to summarise - the best way to be "sure" that a provider works on Airflow 2.2 is to help the community by performing the tests of RC candidates on 2.2 - and possibly even engage your customers to run such testing with the release candidates. And this is also a good answer to the customers of yours - if they ask if it works with earlier airflow version, the best answer is "yes, they are intended to be, but the best way to make sure of that is if you can help with testing the RC candidate" and inform them when the RC is out :).The RCs are installable from PyPI - same as any other versions - but they need to be installed deliberately, specifying the version to install.

One important note. As mentioned above, the last 2.2-compatible release of providers will be at the beginning of October, so the November wave of providers will have 2.3+ compatibilty only. Then all providers will have their major version bumped.
There is a way for Snowflake and (for any other stakeholders) to express their willingness to help with cherry-picking of some bugfixes and features to an earlier version of a provider from the previous line. So if Snowflake would like to do it, it's possible, but it would have to take on the burden of preparing a branch made of cherry-picked bugfixes and features and testing it before we release it. See https://github.com/apache/airflow/blob/main/README.md#release-process-for-providers for details.

@mik-laj
Copy link
Member

mik-laj commented Aug 24, 2022

I prepared a test package: https://drive.google.com/drive/folders/1UQcXJuhiIhLR39y3qSlHII2LeCwyI-yh
I encourage everyone to test it.

@sfc-gh-madkins
Copy link
Contributor Author

sfc-gh-madkins commented Oct 11, 2022 via email

@dhruvk12
Copy link

dhruvk12 commented Sep 8, 2023

Any update on this, I am happy to write something and raise the pr , this weekend !

@sfc-gh-madkins
Copy link
Contributor Author

@dhruvk12 please do :)

@ryaminal
Copy link

ryaminal commented Sep 28, 2023

hi all.
super excited for an easier way to do snowpark in airflow.

looks like the PR mentioned above has gone stale because of the cloudpickle stuff in snowpark. seems related to this snowpark PR that has also gone stale.

there also appears to be an initiative from astronomer to do a similar thing but the demo repo appears to depend on an archived underlying provider.

as someone interested in this, what are the next steps here? has someone already made a community fork of this to get around the cloudpickle+python3.8 issues in the original PR? is there interest in that community fork?

@ryaminal
Copy link

maybe @kaxil is able to answer the question above about the astronomer packages.

@mpgreg
Copy link

mpgreg commented Oct 27, 2023

@ryaminal I have started a PR at https://github.com/mpgreg/airflow/tree/main/airflow/providers/snowflake

@mik-laj @sfc-gh-madkins I also ran into the same cloudpickle dependency issues. Anything you can do to help move snowflakedb/snowpark-python#975 along?

10.65     apache-airflow[devel-ci] 2.8.0.dev0 depends on cloudpickle>=1.4.1
10.65     apache-beam 2.51.0 depends on cloudpickle~=2.2.1
10.65     dask 2023.4.1 depends on cloudpickle>=1.5.0
10.65     distributed 2023.4.1 depends on cloudpickle>=1.5.0
10.65     snowflake-snowpark-python 1.9.0 depends on cloudpickle<=2.0.0 and >=1.6.0; python_version < "3.11"
10.65     apache-airflow[devel-ci] 2.8.0.dev0 depends on cloudpickle>=1.4.1
10.65     apache-beam 2.51.0 depends on cloudpickle~=2.2.1
10.65     dask 2023.4.1 depends on cloudpickle>=1.5.0
10.65     distributed 2023.4.1 depends on cloudpickle>=1.5.0
10.65     snowflake-snowpark-python 1.8.0 depends on cloudpickle<=2.0.0 and >=1.6.0
10.65     apache-airflow[devel-ci] 2.8.0.dev0 depends on cloudpickle>=1.4.1
10.65     apache-beam 2.51.0 depends on cloudpickle~=2.2.1
10.65     dask 2023.4.1 depends on cloudpickle>=1.5.0
10.65     distributed 2023.4.1 depends on cloudpickle>=1.5.0
10.65     snowflake-snowpark-python 1.7.0 depends on cloudpickle<=2.0.0 and >=1.6.0
10.65     The user requested (constraint) cloudpickle==2.2.1```

@mik-laj
Copy link
Member

mik-laj commented Oct 27, 2023

Do the problem still occur when you use the RC version of Snowflake connector? This version doesn't depends on apache-arrow, so it's less risky.

@ryaminal
Copy link

@mik-laj , the RC version depends on nanoarrow right? I don't think that changes the dependency on cloudpickle, does it?

@mik-laj
Copy link
Member

mik-laj commented Oct 30, 2023

This changes the situation a bit, because Apache Beam requires a specific version of Apache Arrow and Cloudpickle, so there is a chance that if the Snowflake connector does not depend on apache-arrow, we will be able to find a matching version of Apache Beam. However, this is a theory that we need to test in practice.

@mpgreg
Copy link

mpgreg commented Oct 31, 2023

I set snowflake-connector-python>=3.3.1 and breeze now starts okay. Havent run any unit/integ tests.

@dhruvk12
Copy link

dhruvk12 commented Oct 31, 2023

@dhruvk12 please do :)

sorry missed this, what is the current status of this? @mpgreg

@mpgreg
Copy link

mpgreg commented Nov 1, 2023

There are two separate PRs here. Actually I haven't made a PR yet so technically only one. But @mik-laj it would be great if you can take a look at my decorators and operators to see if it makes sense to merge with your PR.

At a high level I build from python virtualenv operator. I built this when Snowpark only supported python 3.8 and there were other dependency issues with Airflow. Snowpark now supports 3.8-3.11 but in my experience there will likely always be challenges with major versions. Inheriting from virtualenv operator will simplify dependency management.

I also craeted a SnowparkTable dataclass in order to serialize/deserialize Snowpark Dataframes passed to/from a task. This is needed because the Dataframes are associated with the Snowpark Session which doesn't (or didn't at the time) serialize. So the user can't create a session in the DAG and pass it between tasks. Instead, a new session is created for each task, any SnowparkTable arguments passed to the task are instantiated as Snowpark Dataframes in the session and any Snowpark Dataframes returned from the session are serialized to snowflake as tables or stage objects (user choice).

The creation of the snowpark session and the ser/des of SnowparkTable objects is accomplished in a new virtualenv jinja template

Lastly, and somewhat unrelated, I created a new custom XCOM Backend for Snowflake. Some users (specifically in regulated industries) wanted an option to keep all data in Snowflake so all task in/output is saved to Snowflake objects (tables or stages) and only a uri (snowflake://<ACCOUNT>.<REGION>/<DATABASE>/<SCHEMA>?table=<TABLE>&key=<KEY> or snowflake://<ACCOUNT>.<REGION>/<DATABASE>/<SCHEMA>?stage=<STAGE>&key=<FILE_PATH>) is passed to Airflow XCOM. Small, json-serializable objects are serialized to a single XCOM table in Snowflake with schema similar to Airflow XCOM table. Non-json-serializable objects, or objects bigger than 16mb Snowflake limit, are serialized to a stage.

Operators

  • SnowparkPythonOperator: This is the simplest operator which runs as a PythonOperator in the Airflow instance. This requires that the Airflow instance is running a version of python supported by Snowpark and has Snowpark Python package installed. NOTE: Currently Snowpark supports python 3.8, 3.9 and 3.10, 3.11.
  • SnowparkVirtualenvOperator: This operator creates a python virtualenv to run the python callable in a subprocess. Users can specify python package requirements (ie. snowflake-snowpark-python).
  • SnowparkExternalPythonOperator: This operator runs the Snowpark python callable in a pre-existing virtualenv. It is assumed that Snowpark is already installed in that environment.
  • SnowparkPythonUDFOperator: (TBD)
  • SnowparkPythonSPROCOperator: (TBD)

Decorators

  • snowpark_python_task: Taskflow decorator for SnowparkPythonOperator (i.e @task.snowpark_python())
  • snowpark_virtualenv_task: Taskflow decorator for SnowparkVirtualenvOperator (i.e @task.snowpark_virtualenv())
  • snowpark_ext_python_task: Taskflow decorator for SnowparkExternalPythonOperator (i.e @task.snowpark_ext_python())

@dhruvk12
Copy link

dhruvk12 commented Nov 1, 2023

Looks good @mpgreg

@dhruvk12
Copy link

dhruvk12 commented Nov 4, 2023

I am just curious how the testing of the operator will take place? Any tests written for this?

@sfc-gh-madkins
Copy link
Contributor Author

@mpgreg lets push this one over the finish line!

@eladkal
Copy link
Contributor

eladkal commented Mar 30, 2024

I think it's very unlikely that the community will push it. It has been open for almost 2 years.
If Snowflake wants to have it then someone from Snowflake needs to priortize this task.
I am happy to help with review.

I think we better close this issue as it's probably better fit to be in Snowflake backlog rather in the community one. If someone thinks differently we can reopen.

@eladkal eladkal closed this as not planned Won't fix, can't repro, duplicate, stale Apr 2, 2024
@sfc-gh-jdu
Copy link
Contributor

Hi there, I'm Jianzhun from the Snowflake (Snowpark Engineering team), and I would like to pick up this issue and get Snowpark supported officially in Airflow! May we re-open this issue?

@kaxil
Copy link
Member

kaxil commented Aug 2, 2024

@sfc-gh-jdu Great to hear that --- its a long standing issue -- eagerly waiting to see it through the finish line

@sfc-gh-jdu
Copy link
Contributor

sfc-gh-jdu commented Sep 27, 2024

Hi folks, FYI the first PR is ready for review in #42457. Any comment is appreciated!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers good first issue kind:feature Feature Requests provider:snowflake Issues related to Snowflake provider
Projects
None yet
10 participants