Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CT-431] Support for Pyspark driver #305

Closed
cccs-jc opened this issue Mar 27, 2022 · 22 comments
Closed

[CT-431] Support for Pyspark driver #305

cccs-jc opened this issue Mar 27, 2022 · 22 comments
Labels
enhancement New feature or request Stale

Comments

@cccs-jc
Copy link

cccs-jc commented Mar 27, 2022

Describe the feature

Add a forth connection option to the dbt-adapter. This forth connection would create a pyspark context and utilize the spark.sql(<sql>) function to execute sql statements.

Describe alternatives you've considered

The alternative is to run your own thrift server which is difficult to setup.

Who will this benefit?

A pyspark context gives you control over the spark application configuration and amount of resources to use in the spark cluster. It also enables the user to register custom pyspark UDF functions and to create custom views based on pyspark Dataframes.

  1. control spark job creation
  2. register custom pyspark UDF
  3. register custom views based on pypsark Dataframes

Are you interested in contributing this feature?

Yes for sure. I have an implementation of this feature which I would like to contribute to this project.

Here's how it works from the user's point of view.

Specify a new connection method pyspark

profiles.yml

project1:
  target: dev
  outputs:
    dev:
      type: spark
      method: pyspark
      python_module: spark.spark_context

The python_module points to a python file found in the PYTHONPATH. In this case spark/spark_context.py. The pyspark adapter will call the create_spark_context() function found in this file. The user can thus create their own spark context (either a local instance or one that leverages a cluster). This hook also lets you create custom pyspark UDF registrations.

spark/spark_context.py

def create_spark_context():
    return SparkSession.builder.getOrCreate()

A second hook is available in the the sources. This hook lets the user register custom pyspark Dataframe as an sql view df.createOrReplaceTempView(<view name>). The user is free to create any Dataframe logic they need.

sources.yml

sources:
  - name: custom
    tables:
    - name: raw_users
      meta:
        python_module: models.staging.raw_users

The python_module is loaded the same way except that the hook function has a different signature.

def create_dataframe(spark, start_time, end_time)

Using pyspark you can work around the limitations of SparkSQL when reading datafiles. For example using pyspark you can load the schema of the json you are going to read. Thus avoiding sparks schema discovery process and making sure that the data is read in the schema you want.

models/staging/raw_users.py

def create_dataframe(spark, start_time, end_time):
    data = < load schema from a file>
    schema = StructType.fromJson(json.loads(data))

    df = (spark
	.read
	.schema(schema)
	.format("json")
	.load(f"{WAREHOUSE}/{YEAR}/{MONTH}/{DAY}/users/*.logs")
	)
    return df

The implementation of this feature consist of a new PysparkConnectionWrapper which executes sql statements via the spark context

def execute(self, sql, bindings=None):
    self.result = self.spark.sql(sql)

And a new method on the SparkRelation which register pyspark dataframes as sql views.

def load_python_module(self, start_time, end_time):
    path = self.meta.get('python_module')
    module = importlib.import_module(path)
    create_dataframe = getattr(module, "create_dataframe")
    df = create_dataframe(spark, start_time, end_time)
    df.createOrReplaceTempView(self.identifier)

Registration of pyspark views is initiated by a modified source macro

{% macro source(source_name, identifier, start_dt = None, end_dt = None) %}
    {%- set relation = builtins.source(source_name, identifier) -%}
    {%- if execute and (relation.source_meta.python_module or relation.meta.python_module) -%}

        {%- do relation.load_python_module(start_dt, end_dt) -%}

        {%- do return(relation.identifier) -%}
    {% else -%}
        {%- do return(relation) -%}
    {% endif -%}
{% endmacro %}
@cccs-jc cccs-jc added enhancement New feature or request triage labels Mar 27, 2022
@github-actions github-actions bot changed the title Support for Pyspark driver [CT-431] Support for Pyspark driver Mar 27, 2022
@jtcohen6 jtcohen6 self-assigned this Mar 28, 2022
cccs-jc pushed a commit to cccs-jc/dbt-spark that referenced this issue Mar 30, 2022
cccs-jc pushed a commit to cccs-jc/dbt-spark that referenced this issue Mar 30, 2022
@cccs-jc
Copy link
Author

cccs-jc commented Mar 30, 2022

created a pull request #308

@jtcohen6
Copy link
Contributor

@cccs-jc Really interesting! Thanks for opening the issue, and the accompanying PR.

We just merged a new connection method (session) that can leverage a PySpark session (#279). I'd be very curious to get your eyes on that implementation (and @JCZuurmond's eyes on yours, in #308). What are the crucial differences between the two approaches? For my part, I'm not convinced that dbt's role should be to manage the Spark session's instantiation/configuration, versus simply being able to connect to one that already exists.

As always, the questions are:

  • Which generally-applicable guardrails do we want to keep in place, (separation of concerns between dbt + database "infrastructure")
  • Where does Spark blur those boundaries, in a way that gets us more than what's available elsewhere (all-on-one-machine "unit" testing of macro/materialization/etc functionality)

models/staging/raw_users.py

Neat :) We're definitely thinking about what it might look like to implement "dataframe" support in dbt + Spark/Databricks. I don't think a dbt-managed pyspark is session is the way we'd want to go, though.

@jtcohen6 jtcohen6 removed the triage label Mar 30, 2022
@jtcohen6 jtcohen6 removed their assignment Mar 30, 2022
@cccs-jc
Copy link
Author

cccs-jc commented Mar 30, 2022

@JCZuurmond's implementation is very close to mine. The main differences are that mine has these additional features

A hook to create custom spark session

  • In this hook you can create a spark context which specifies RAM/CPU to use from a cluster, configure iceberg catalogs etc
  • In this hook you can also register custom UDF which can be use in SQL transformations

The only way I know how to register UDF via SQL requires the python package to be installed in a pre-existing server which is very inconvenient.

A hook to register a custom view

  • This hook lets you register a pyspark dataframe as a dbt source. This lets you work around the limitations of SparkSQL. For example using pyspark you can load the schema of the json you are going to read. Thus avoiding spark's schema discovery process and making sure that the data is read in the schema you want.

Creating views in SQL (dbt-externals) is possible but it has limitations which the programmatic approach does not.

Being able to create custom python UDF and views is important to us. Curious to know what alternatives you are thinking about?

@JCZuurmond
Copy link
Collaborator

JCZuurmond commented Mar 30, 2022

@cccs-jc : Great minds think alike! 💯

A hook to create custom spark session.

You solve this by first creating the Spark session with the config and configuration that you prefer. Then, SparkSession.builder.getOrCreate() will get this context for you. There is no hook needed.

A hook to register a custom view

Why not do this outside of dbt or right before you call dbt programmatically with the Spark session approach? The example you give about the json schema is solved by using dbt external tables.

If you have improvements to the current implementation, feel free to add changes to the session module!

@cccs-jc
Copy link
Author

cccs-jc commented Mar 31, 2022

Our initial proof of concept used the approach you mention. That is a modified dbt launch script which creates a spark context and registers views. The in the dbt adapter retrieve the session and execute sql.

The issues with this approach are:

  • You need a modified dbt launch script so if you want to use the vscode-power-dbt extension you need a modified dbt-core package.
  • Since dbt-adapter retrieves the spark session and is already aware of spark sessions why not have it use a hook to create one.
  • Registering views can take a bit of time. When you have a few dozen views and you launch dbt for a specific model why register these views. Perhaps the model you run does not even use any of the views. Seems to make more sense to create the views as they are needed. Thus the hook into the source processing.

I'm aware of dbt-external tables however it's not as flexible as pyspark code. We built pyspark libraries to create dataframe out of our raw data (hundreds of tables) and would like to leverage them within dbt.

Also I'm not sure dbt-external could handle use cases like reading from Azure Kusto. https://docs.microsoft.com/en-us/azure/data-explorer/spark-connector

The improvement I would like to contribute is hooks into spark session creation and view creation.

@JCZuurmond
Copy link
Collaborator

Hi @cccs-jc , excuse me for the late reply. I want to help you with solving your issue whilst finding a way to not add too specific code the code base. Note that I am not able to approve PRs, that is up to the dbt folks.

spark session creation
There are other ways to configure your Spark session. You could do this on the cluster level, e.g. when using Databricks. Or in spark-defaults.conf.

Will that help you?

Hooks
I like dbt hooks, but adding hooks to one connection method is confusing -> Why only for the Spark session connection? Also, I prefer an API that is agnostic to what the hook is about, i.e. not create_dataframe but run_pre_hook. I created an issue for this on dbt core.

Will that help you?

dbt-external-tables
I understand that pySpark feels more flexible. Though, I think you can implement a lot in Jinja. I have not tested Azure Kusto, you might be able to translate the example in the docs to Spark SQL.

dbt-external-tables is pushing the limits of dbt already. dbt is a SQL based transformation tool, the fundamental assumption is that there are source models present to transform. This is generally true in database world, but in data warehouse world the assumption is slightly different: data is stored at various locations (e.g. files or Azure kusto) w.o. having the necessary source tables referencing the data. When using dbt-spark I recommend you to thoroughly think about how you create your source data models. dbt-external-tables helps, it might not cover all your cases, I do not always use it, I consider to use something else than dbt to create the sources.

@cccs-jc
Copy link
Author

cccs-jc commented Apr 14, 2022

I'm reconsidering the way I've been creating a spark session. One of the issues I have at the moment is that I'm creating a brand new spark session for every dbt run. Which is a bit slow. I've come to realize that I could separate the creation of the spark JVM from the creation of the pyspark client.

I can launch the PythonGatewayServer and give it file location to write the listening port and secret of the gateway

export _PYSPARK_DRIVER_CONN_INFO_PATH=/tmp/DbtPySparkShell
spark-submit --name DbtPySparkShell pyspark-shell

This will start the pyspark-shell JVM and wait for a python client to connect to the gateway.

I can then extract the port and secret from this file. Here's an example in python to do that.

from pyspark.serializers import read_int, UTF8Deserializer
conn_info_file='/tmp/DbtPySparkShell'
info = open(conn_info_file, "rb")
gateway_port = read_int(info)
gateway_secret = UTF8Deserializer().loads(info)

Setting these environment variables will tell the pyspark.SparkContext.getOrCreate() to connect to the driver JVM

export PYSPARK_GATEWAY_PORT=
export PYSPARK_GATEWAY_SECRET=

So the new dbt-spark session mode will connect to an existing JVM process.

Note it's important to close the connection. Failing to do so will cause next client to fail to connect. Do you currently close the spark context?

This makes it much faster to run dbt commands since the python client only connects an already running spark JVM.

I still like the hook we have to create pyspark based views. We might just keep this feature in our version of dbt-spark for now.

@JCZuurmond
Copy link
Collaborator

We do not close the Spark session. I think we should add this to the Cursor.close

@JCZuurmond
Copy link
Collaborator

@cccs-jc : Do you want to contribute closing the Spark connection? And will that solve your questions?

@cccs-jc
Copy link
Author

cccs-jc commented Apr 25, 2022

I don't have the time to contribute this at the moment. Does it resolve my questions yes and no. I can work around the limitation of the spark context creation but still like to be able to create "views" so hook in the source creation is still of interest.

For now I'll keep our fork of dbt-spark which includes these hooks. Maybe these will become available in a more general form in dbt core as a pre-hook or something similar.

@kbendick
Copy link

kbendick commented Jun 9, 2022

Hi all. Is the spark session still not closed? I could potentially help out with this.

@JCZuurmond In the cursor class, is the dataframe that's used and its associated spark session safe to close when the cursor is closed?

e.g. when Cursor.close() or the cursor's __exit__ function is called, is that the end of that spark session life cycle or is it still needed?

@kbendick
Copy link

kbendick commented Jun 9, 2022

Oh nevermind I wasn't aware that #308 would potentially resolve this. Let me know if I'm wrong.

Otherwise, the active spark session can be easily pulled off of the dataframe itself (though I don't know if it's necessarily safe to close it right away).

@JCZuurmond
Copy link
Collaborator

@kbendick : I do not expect that we will merge #308 , if you want you could add the closing of the cursor. I think it makes sense to close the session in the Cursor.close. @jtcohen6 : Are the connections generally closed on the cursor or on the connection?

@kbendick
Copy link

kbendick commented Jun 9, 2022

@kbendick : I do not expect that we will merge #308 , if you want you could add the closing of the cursor. I think it makes sense to close the session in the Cursor.close. @jtcohen6 : Are the connections generally closed on the cursor or on the connection?

It really depends if the Spark session associated with the dataframe inside of the cursor is going to be used any more once the cursor is exhausted / closed.

If there’s tests I can just open a PR and we’ll see?

@JCZuurmond
Copy link
Collaborator

Sounds like a plan!

And the data frame is not used outside the class. I think it fits dbt's standard workflow: after a cursor is closed not call any methods on the cursor anymore.

@jtcohen6
Copy link
Contributor

Conceptually, dbt expects to close connections, and the close mechanism is often implemented on the connection handle. We don't want to accidentally leave connections open if they're not needed. Depending on the connection type, the underlying mechanism may or may not need to be actually implemented on the cursor. E.g.:

def close(self):
if self._cursor:
# Handle bad response in the pyhive lib when
# the connection is cancelled
try:
self._cursor.close()
except EnvironmentError as exc:
logger.debug("Exception while closing cursor: {}".format(exc))
self.handle.close()

@kbendick
Copy link

Sorry for the late reply on this.

Got side tracked for a bit. I will go ahead and open a PR just trying to close the Spark Session when the cursor is closed.

Given how shared the Spak Session is (typical applications have one Spark Session throughout their whole lifetime but I can't say for sure yet about dbt-spark), so I'm not sure if this approach is really going to work, but i think it is worth investigating and giving a try.

@kbendick
Copy link

If Spark 2.x weren't supported, we could simply use SparkSession.getActiveSession() but that method was added in Spark 3.0.0.

I'm correct in my understanding that Spark 2.4.x is supported still?

@jtcohen6
Copy link
Contributor

jtcohen6 commented Jun 16, 2022

@kbendick We are planning to add functionality in a forthcoming version of dbt-spark that will require Spark v3 (#342), if we can unblock ourselves in the upgrade for CI. I see you put a comment on #349 (comment) offering to help, which is much appreciated :)

At that point, future versions of dbt-spark will expect to be using Spark v3.

@JCZuurmond
Copy link
Collaborator

@kbendick : Are you still interested in creating the PR?

@cccs-jc
Copy link
Author

cccs-jc commented Aug 14, 2022

I don't really have time at the moment. I thought the close cursor was added already..?

@github-actions
Copy link
Contributor

This issue has been marked as Stale because it has been open for 180 days with no activity. If you would like the issue to remain open, please remove the stale label or comment on the issue, or it will be closed in 7 days.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request Stale
Projects
None yet
Development

No branches or pull requests

4 participants