Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/python model v1 #377

Merged
merged 40 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
ae4c60f
first databrick implementation
ChenyuLInx May 3, 2022
ecece22
add cell to notebook
ChenyuLInx May 5, 2022
77dcf6b
proper return run result
ChenyuLInx May 5, 2022
a4211a9
properly make function available
ChenyuLInx May 5, 2022
2e2cae1
ref return df
ChenyuLInx May 6, 2022
d69fe4c
Bump version to 1.3.0a1
jtcohen6 May 6, 2022
c79991d
Small quality of life fixups
jtcohen6 May 6, 2022
ccdc170
update more result
ChenyuLInx May 6, 2022
4c9f522
Merge pull request #2 from dbt-labs/jerco/update-version
ChenyuLInx May 6, 2022
be2b0a2
fix format
ChenyuLInx May 10, 2022
4195ccd
better error handling for api call and target relation templating
ChenyuLInx May 31, 2022
98f60e7
fix format
ChenyuLInx May 31, 2022
0a6e673
fix format
ChenyuLInx Jun 1, 2022
c29867e
add functional test
ChenyuLInx Jun 2, 2022
cb5ba0d
Existing python model code
ChenyuLInx Jun 14, 2022
f87a30b
first pass
iknox-fa Jun 16, 2022
d6ac3b9
cleanup , pt 1
iknox-fa Jun 16, 2022
ca04f35
cleaned up incremental logic
iknox-fa Jun 23, 2022
d639594
cleanup, add is_incremental
iknox-fa Jun 24, 2022
f5c178e
remove debug logging
iknox-fa Jun 24, 2022
7a44feb
flake8
iknox-fa Jun 27, 2022
d7b06d4
removed python lang from temp views for now
iknox-fa Jun 27, 2022
4d4ae51
add change schema test
ChenyuLInx Jun 17, 2022
8b95b2e
removed log line
iknox-fa Jun 27, 2022
2e18ae3
Merge pull request #3 from dbt-labs/feature/python-model-v1-incremental
iknox-fa Jun 27, 2022
a758930
more restiction and adjust syntax
ChenyuLInx Jun 28, 2022
88b7ad4
adjust name for incremental model
ChenyuLInx Jun 28, 2022
85a49ae
stage changes
ChenyuLInx Jun 29, 2022
3ee0a42
fixed it
ChenyuLInx Jun 29, 2022
b157b6e
update function args and adjust more names
ChenyuLInx Jun 29, 2022
d596866
remove unneed macro
ChenyuLInx Jul 15, 2022
27c1441
minic result for python job
ChenyuLInx Jul 19, 2022
9eea396
fix python model test (#406)
ChenyuLInx Jul 26, 2022
cc003d9
Merge branch 'main' into feature/python-model-v1
ChenyuLInx Jul 26, 2022
c907a00
make black happy
ChenyuLInx Jul 26, 2022
0aebf04
enable python model test (#409)
ChenyuLInx Jul 27, 2022
6efee9e
skip test that failed on main
ChenyuLInx Jul 27, 2022
7e8943b
add comment to run code
ChenyuLInx Jul 28, 2022
34f144e
using core code and bring back incremental test
ChenyuLInx Jul 28, 2022
fa303d9
add changelog
ChenyuLInx Jul 28, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ jobs:
python -m pip install pre-commit
pre-commit --version
python -m pip install mypy==0.942
python -m pip install types-requests
mypy --version
python -m pip install -r requirements.txt
python -m pip install -r dev-requirements.txt
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## dbt-spark 1.3.0b1 (Release TBD)

### Features
- support python model through notebook, currently supported materializations are table and incremental. ([#377](https://github.com/dbt-labs/dbt-spark/pull/377))

### Fixes
- Pin `pyodbc` to version 4.0.32 to prevent overwriting `libodbc.so` and `libltdl.so` on Linux ([#397](https://github.com/dbt-labs/dbt-spark/issues/397/), [#398](https://github.com/dbt-labs/dbt-spark/pull/398/))

Expand Down
127 changes: 121 additions & 6 deletions dbt/adapters/spark/impl.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import re
import requests
import time
import base64
from concurrent.futures import Future
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Union
Expand All @@ -11,7 +14,8 @@
import dbt.exceptions

from dbt.adapters.base import AdapterConfig
from dbt.adapters.base.impl import catch_as_completed
from dbt.adapters.base.impl import catch_as_completed, log_code_execution
from dbt.adapters.base.meta import available
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.spark import SparkConnectionManager
from dbt.adapters.spark import SparkRelation
Expand Down Expand Up @@ -159,11 +163,9 @@ def list_relations_without_caching(

return relations

def get_relation(
self, database: Optional[str], schema: str, identifier: str
) -> Optional[BaseRelation]:
def get_relation(self, database: str, schema: str, identifier: str) -> Optional[BaseRelation]:
if not self.Relation.include_policy.database:
database = None
database = None # type: ignore

return super().get_relation(database, schema, identifier)

Expand Down Expand Up @@ -296,7 +298,12 @@ def get_catalog(self, manifest):
for schema in schemas:
futures.append(
tpe.submit_connected(
self, schema, self._get_one_catalog, info, [schema], manifest
self,
schema,
self._get_one_catalog,
info,
[schema],
manifest,
)
)
catalogs, exceptions = catch_as_completed(futures)
Expand Down Expand Up @@ -380,6 +387,114 @@ def run_sql_for_tests(self, sql, fetch, conn):
finally:
conn.transaction_open = False

@available.parse_none
@log_code_execution
def submit_python_job(self, parsed_model: dict, compiled_code: str, timeout=None):
# TODO improve the typing here. N.B. Jinja returns a `jinja2.runtime.Undefined` instead
# of `None` which evaluates to True!

# TODO limit this function to run only when doing the materialization of python nodes

# assuming that for python job running over 1 day user would mannually overwrite this
schema = getattr(parsed_model, "schema", self.config.credentials.schema)
identifier = parsed_model["alias"]
if not timeout:
timeout = 60 * 60 * 24
if timeout <= 0:
raise ValueError("Timeout must larger than 0")

auth_header = {"Authorization": f"Bearer {self.connections.profile.credentials.token}"}

# create new dir
if not self.connections.profile.credentials.user:
raise ValueError("Need to supply user in profile to submit python job")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What user is expected to be used in the automated/scheduled dbt jobs for the production system? I think this implies a user should be created for that system.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial thought is that user would be the Databricks user who created the token to use for production environment. But following the discussion in the above thread, if we pivot to do spark_python_task, then this could be different setup on production(configs needed for s3 or DBFS)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's continue the discussion in the other thread. I would be in favor of not requiring a user to be stated.

# it is safe to call mkdirs even if dir already exists and have content inside
work_dir = f"/Users/{self.connections.profile.credentials.user}/{schema}"
response = requests.post(
f"https://{self.connections.profile.credentials.host}/api/2.0/workspace/mkdirs",
headers=auth_header,
json={
"path": work_dir,
},
)
if response.status_code != 200:
raise dbt.exceptions.RuntimeException(
f"Error creating work_dir for python notebooks\n {response.content!r}"
)

# add notebook
b64_encoded_content = base64.b64encode(compiled_code.encode()).decode()
response = requests.post(
f"https://{self.connections.profile.credentials.host}/api/2.0/workspace/import",
headers=auth_header,
json={
"path": f"{work_dir}/{identifier}",
"content": b64_encoded_content,
"language": "PYTHON",
"overwrite": True,
"format": "SOURCE",
},
)
if response.status_code != 200:
raise dbt.exceptions.RuntimeException(
f"Error creating python notebook.\n {response.content!r}"
)

# submit job
submit_response = requests.post(
f"https://{self.connections.profile.credentials.host}/api/2.1/jobs/runs/submit",
headers=auth_header,
json={
"run_name": "debug task",
"existing_cluster_id": self.connections.profile.credentials.cluster,
"notebook_task": {
Copy link
Collaborator

@JCZuurmond JCZuurmond Aug 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use the spark_python_task. IMHO it is cleaner than notebooks, also, I expect you do not require the user to be stated when using the spark_python_task

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @JCZuurmond, thanks for pointing me to another method here!

This method is being used because it will leave a notebook after the run that you can play with and iterate you python model code there. But I do agree that case is more suitable during development phase.

I looked up spark_python_task, seems like if we want to do it that way, we will still need to upload the python file somewhere(s3 or DBFS) and pass in the path here. In that case we will need to have extra setup required to put that python file, vs currently we require the extra user you also mentioned in the next comment to create a folder and put the notebooks in.

Happy to hear more thoughts on this and pivot to the other approach for production runs!

Copy link
Collaborator

@JCZuurmond JCZuurmond Aug 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is being used because it will leave a notebook after the run that you can play with and iterate you python model code there. But I do agree that case is more suitable during development phase.

I understand this is useful during development, though, it is unexpected behavior to me. This does not happen for the SQL models (we could also upload the SQL in a notebook and run the notebook as a job). And, it requires a user for the production system, which was not required before.

I looked up spark_python_task, seems like if we want to do it that way, we will still need to upload the python file somewhere(s3 or DBFS) and pass in the path here. In that case we will need to have extra setup required to put that python file, vs currently we require the extra user you also mentioned in the next comment to create a folder and put the notebooks in.

I would use a certain convention, for example that we upload the scripts in dbfs:/dbt/<project name>/<database name>/<model name>.py. This would be similar like the database field in the profile that is used as prefix for your schema name. This eliminates the need for the user fields and mimics existing dbt behavior: like the location in external tables.

And maybe the create dirs is not needed, I don't think it is for the spark_python_task.

Copy link
Contributor Author

@ChenyuLInx ChenyuLInx Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #424 for this, feel free to update that issue! Thank you so much for the feedback!!

"notebook_path": f"{work_dir}/{identifier}",
},
},
)
if submit_response.status_code != 200:
raise dbt.exceptions.RuntimeException(
f"Error creating python run.\n {response.content!r}"
)

# poll until job finish
state = None
start = time.time()
run_id = submit_response.json()["run_id"]
terminal_states = ["TERMINATED", "SKIPPED", "INTERNAL_ERROR"]
while state not in terminal_states and time.time() - start < timeout:
time.sleep(1)
resp = requests.get(
f"https://{self.connections.profile.credentials.host}"
f"/api/2.1/jobs/runs/get?run_id={run_id}",
headers=auth_header,
)
json_resp = resp.json()
state = json_resp["state"]["life_cycle_state"]
# logger.debug(f"Polling.... in state: {state}")
if state != "TERMINATED":
raise dbt.exceptions.RuntimeException(
"python model run ended in state"
f"{state} with state_message\n{json_resp['state']['state_message']}"
)

# get end state to return to user
run_output = requests.get(
f"https://{self.connections.profile.credentials.host}"
f"/api/2.1/jobs/runs/get-output?run_id={run_id}",
headers=auth_header,
)
json_run_output = run_output.json()
result_state = json_run_output["metadata"]["state"]["result_state"]
if result_state != "SUCCESS":
raise dbt.exceptions.RuntimeException(
"Python model failed with traceback as:\n"
"(Note that the line number here does not "
"match the line number in your code due to dbt templating)\n"
f"{json_run_output['error_trace']}"
)
return self.connections.get_response(None)

def standardize_grants_dict(self, grants_table: agate.Table) -> dict:
grants_dict: Dict[str, List[str]] = {}
for row in grants_table:
Expand Down
61 changes: 36 additions & 25 deletions dbt/include/spark/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -117,35 +117,46 @@
{%- endmacro %}


{% macro create_temporary_view(relation, sql) -%}
{{ return(adapter.dispatch('create_temporary_view', 'dbt')(relation, sql)) }}
{% macro create_temporary_view(relation, compiled_code) -%}
{{ return(adapter.dispatch('create_temporary_view', 'dbt')(relation, compiled_code)) }}
{%- endmacro -%}

{#-- We can't use temporary tables with `create ... as ()` syntax #}
{% macro spark__create_temporary_view(relation, sql) -%}
create temporary view {{ relation.include(schema=false) }} as
{{ sql }}
{% endmacro %}
{#-- We can't use temporary tables with `create ... as ()` syntax --#}
{% macro spark__create_temporary_view(relation, compiled_code) -%}
create temporary view {{ relation.include(schema=false) }} as
{{ compiled_code }}
{%- endmacro -%}


{% macro spark__create_table_as(temporary, relation, sql) -%}
{% if temporary -%}
{{ create_temporary_view(relation, sql) }}
{%- else -%}
{% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %}
create or replace table {{ relation }}
{% else %}
create table {{ relation }}
{% endif %}
{{ file_format_clause() }}
{{ options_clause() }}
{{ partition_cols(label="partitioned by") }}
{{ clustered_cols(label="clustered by") }}
{{ location_clause() }}
{{ comment_clause() }}
as
{{ sql }}
{%- endif %}
{%- macro spark__create_table_as(temporary, relation, compiled_code, language='sql') -%}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in this macro seems to be in dbt-databricks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe dbt-spark should raise an error when language == 'python'?

{%- if language == 'sql' -%}
{%- if temporary -%}
{{ create_temporary_view(relation, compiled_code) }}
{%- else -%}
{% if config.get('file_format', validator=validation.any[basestring]) == 'delta' %}
create or replace table {{ relation }}
{% else %}
create table {{ relation }}
{% endif %}
{{ file_format_clause() }}
{{ options_clause() }}
{{ partition_cols(label="partitioned by") }}
{{ clustered_cols(label="clustered by") }}
{{ location_clause() }}
{{ comment_clause() }}
as
{{ compiled_code }}
{%- endif -%}
{%- elif language == 'python' -%}
{#--
N.B. Python models _can_ write to temp views HOWEVER they use a different session
and have already expired by the time they need to be used (I.E. in merges for incremental models)

TODO: Deep dive into spark sessions to see if we can reuse a single session for an entire
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this a result of using jobs? I think each job always has a different Spark session

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the comment from @iknox-fa.

The main issue here is that the python part of the model building have it's session from the jobs but the rest of the logic for the model have another session, we will have to delete the python tmp table after the merge logic(using existing SQL) vs if it is all SQL we can just make a true tmp table and it will be gone after current dbt model finishes

dbt invocation.
--#}
{{ py_write_table(compiled_code=compiled_code, target_relation=relation) }}
{%- endif -%}
{%- endmacro -%}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
{% materialization incremental, adapter='spark' -%}

{#-- Validate early so we don't run SQL if the file_format + strategy combo is invalid --#}
{%- set raw_file_format = config.get('file_format', default='parquet') -%}
{%- set raw_strategy = config.get('incremental_strategy') or 'append' -%}
Expand All @@ -8,43 +7,63 @@
{%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%}
{%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%}

{#-- Set vars --#}

{%- set unique_key = config.get('unique_key', none) -%}
{%- set partition_by = config.get('partition_by', none) -%}

{%- set full_refresh_mode = (should_full_refresh()) -%}

{% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %}

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

{% if strategy == 'insert_overwrite' and partition_by %}
{% call statement() %}
{%- set language = model['language'] -%}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}
{%- set target_relation = this -%}
{%- set existing_relation = load_relation(this) -%}
{%- set tmp_relation = make_temp_relation(this) -%}

{#-- Set Overwrite Mode --#}
{%- if strategy == 'insert_overwrite' and partition_by -%}
{%- call statement() -%}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
{% endcall %}
{% endif %}
{%- endcall -%}
{%- endif -%}

{#-- Run pre-hooks --#}
{{ run_hooks(pre_hooks) }}

{% set is_delta = (file_format == 'delta' and existing_relation.is_delta) %}

{% if existing_relation is none %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% elif existing_relation.is_view or full_refresh_mode %}
{#-- Incremental run logic --#}
{%- if existing_relation is none -%}
{#-- Relation must be created --#}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}
{%- elif existing_relation.is_view or should_full_refresh() -%}
{#-- Relation must be dropped & recreated --#}
{% set is_delta = (file_format == 'delta' and existing_relation.is_delta) %}
{% if not is_delta %} {#-- If Delta, we will `create or replace` below, so no need to drop --#}
{% do adapter.drop_relation(existing_relation) %}
{% endif %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %}
{% endif %}

{%- call statement('main') -%}
{{ build_sql }}
{%- endcall -%}
{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}
{%- else -%}
{#-- Relation must be merged --#}
{%- call statement('create_tmp_relation', language=language) -%}
{{ create_table_as(True, tmp_relation, compiled_code, language) }}
{%- endcall -%}
{%- do process_schema_changes(on_schema_change, tmp_relation, existing_relation) -%}
{%- call statement('main') -%}
{{ dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) }}
{%- endcall -%}
{%- if language == 'python' -%}
{#--
This is yucky.
See note in dbt-spark/dbt/include/spark/macros/adapters.sql
re: python models and temporary views.

Also, why doesn't either drop_relation or adapter.drop_relation work here?!
--#}
{% call statement('drop_relation') -%}
drop table if exists {{ tmp_relation }}
{%- endcall %}
{%- endif -%}
{%- endif -%}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke) %}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/spark/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@

{% if not target_relation_exists %}

{% set build_sql = build_snapshot_table(strategy, model['compiled_sql']) %}
{% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}

{% else %}
Expand Down
24 changes: 20 additions & 4 deletions dbt/include/spark/macros/materializations/table.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{% materialization table, adapter = 'spark' %}

{%- set language = model['language'] -%}
{%- set identifier = model['alias'] -%}
{%- set grant_config = config.get('grants') -%}

Expand All @@ -19,9 +19,10 @@
{%- endif %}

-- build model
{% call statement('main') -%}
{{ create_table_as(False, target_relation, sql) }}
{%- endcall %}

{%- call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall -%}

{% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke) %}
Expand All @@ -33,3 +34,18 @@
{{ return({'relations': [target_relation]})}}

{% endmaterialization %}


{% macro py_write_table(compiled_code, target_relation) %}
{{ compiled_code }}
# --- Autogenerated dbt materialization code. --- #
dbt = dbtObj(spark.table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does spark.table come from? Should we use databricks.table in dbt-databricks?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a python function later used as dbt_load_df_function to load data here.

def ref(*args,dbt_load_df_function):
    refs = {{ ref_dict | tojson }}
    key = ".".join(args)
    return dbt_load_df_function(refs[key])

This works with the current notebook submit solution. I haven't tested databricks.table. Do you think it will perform better than spark.table?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, got it, spark is the one on the notebook. Then it should be fine. Thanks.

Copy link
Collaborator

@JCZuurmond JCZuurmond Aug 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can make the spark more explicit and thus not expect the notebook to magically insert this global variable by adding above {{ compiled_code }}:

from pyspark.sql import SparkSession
session = SparkSession.builder.getOrCreate()

df = model(dbt, spark)
df.write.mode("overwrite").format("delta").saveAsTable("{{ target_relation }}")
{%- endmacro -%}

{%macro py_script_comment()%}
# how to execute python model in notebook
# dbt = dbtObj(spark.table)
# df = model(dbt, spark)
{%endmacro%}
Loading