Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for job_parameters and dbt_commands in DatabricksRunNow Operator #43895

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ class DatabricksRunNowOperator(BaseOperator):

json = {
"job_id": 42,
"notebook_params": {"dry-run": "true", "oldest-time-to-consider": "1457570074236"},
"job_parameters": {"dry-run": "true", "oldest-time-to-consider": "1457570074236"},
}

notebook_run = DatabricksRunNowOperator(task_id="notebook_run", json=json)
Expand All @@ -688,6 +688,8 @@ class DatabricksRunNowOperator(BaseOperator):

job_id = 42

dbt_commands = ["dbt deps", "dbt seed", "dbt run"]

notebook_params = {"dry-run": "true", "oldest-time-to-consider": "1457570074236"}

python_params = ["douglas adams", "42"]
Expand All @@ -698,6 +700,7 @@ class DatabricksRunNowOperator(BaseOperator):

notebook_run = DatabricksRunNowOperator(
job_id=job_id,
dbt_commands=dbt_commands,
notebook_params=notebook_params,
python_params=python_params,
jar_params=jar_params,
Expand All @@ -711,7 +714,9 @@ class DatabricksRunNowOperator(BaseOperator):
Currently the named parameters that ``DatabricksRunNowOperator`` supports are
- ``job_id``
- ``job_name``
- ``job_parameters``
- ``json``
- ``dbt_commands``
- ``notebook_params``
- ``python_params``
- ``python_named_parameters``
Expand All @@ -731,6 +736,17 @@ class DatabricksRunNowOperator(BaseOperator):
It must exist only one job with the specified name.
``job_id`` and ``job_name`` are mutually exclusive.
This field will be templated.

:param job_parameters: A dict from keys to values that override or augment the job's
parameters for this run. Job parameters are passed to any of the job's tasks that
accept key-value parameters. Job parameters supersede ``notebook_params``, ``python_params``,
``python_named_parameters``, ``jar_params``, ``spark_submit_params``, and they cannot be used in
combination.
This field will be templated.

.. seealso::
https://docs.databricks.com/en/workflows/jobs/settings.html#add-parameters-for-all-job-tasks

:param json: A JSON object containing API parameters which will be passed
directly to the ``api/2.1/jobs/run-now`` endpoint. The other named parameters
(i.e. ``notebook_params``, ``spark_submit_params``..) to this operator will
Expand All @@ -741,6 +757,13 @@ class DatabricksRunNowOperator(BaseOperator):
.. seealso::
For more information about templating see :ref:`concepts:jinja-templating`.
https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsRunNow

:param dbt_commands: A list containing the dbt commands to run using the dbt command line
interface. This field will be templated.

.. seealso::
https://docs.databricks.com/en/jobs/dbt.html

:param notebook_params: A dict from keys to values for jobs with notebook task,
e.g. "notebook_params": {"name": "john doe", "age": "35"}.
The map is passed to the notebook and will be accessible through the
Expand Down Expand Up @@ -832,7 +855,9 @@ def __init__(
*,
job_id: str | None = None,
job_name: str | None = None,
job_parameters: dict[str, str] | None = None,
json: Any | None = None,
dbt_commands: list[str] | None = None,
notebook_params: dict[str, str] | None = None,
python_params: list[str] | None = None,
jar_params: list[str] | None = None,
Expand Down Expand Up @@ -884,6 +909,10 @@ def __init__(
self.json["spark_submit_params"] = spark_submit_params
if idempotency_token is not None:
self.json["idempotency_token"] = idempotency_token
if job_parameters is not None:
self.json["job_parameters"] = job_parameters
if dbt_commands is not None:
self.json["dbt_commands"] = dbt_commands
if self.json:
self.json = normalise_json_content(self.json)
# This variable will be used in case our task gets killed.
Expand Down
5 changes: 5 additions & 0 deletions providers/tests/databricks/operators/test_databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
JOB_ID = "42"
JOB_NAME = "job-name"
JOB_DESCRIPTION = "job-description"
DBT_COMMANDS = ["dbt deps", "dbt seed", "dbt run"]
NOTEBOOK_PARAMS = {"dry-run": "true", "oldest-time-to-consider": "1457570074236"}
JAR_PARAMS = ["param1", "param2"]
RENDERED_TEMPLATED_JAR_PARAMS = [f"/test-{DATE}"]
Expand Down Expand Up @@ -1179,6 +1180,7 @@ def test_init_with_json(self):
Test the initializer with json data.
"""
json = {
"dbt_commands": DBT_COMMANDS,
"notebook_params": NOTEBOOK_PARAMS,
"jar_params": JAR_PARAMS,
"python_params": PYTHON_PARAMS,
Expand All @@ -1190,6 +1192,7 @@ def test_init_with_json(self):

expected = utils.normalise_json_content(
{
"dbt_commands": DBT_COMMANDS,
"notebook_params": NOTEBOOK_PARAMS,
"jar_params": JAR_PARAMS,
"python_params": PYTHON_PARAMS,
Expand All @@ -1215,6 +1218,7 @@ def test_init_with_merging(self):
task_id=TASK_ID,
json=json,
job_id=JOB_ID,
dbt_commands=DBT_COMMANDS,
notebook_params=override_notebook_params,
python_params=PYTHON_PARAMS,
jar_params=override_jar_params,
Expand All @@ -1223,6 +1227,7 @@ def test_init_with_merging(self):

expected = utils.normalise_json_content(
{
"dbt_commands": DBT_COMMANDS,
"notebook_params": override_notebook_params,
"jar_params": override_jar_params,
"python_params": PYTHON_PARAMS,
Expand Down