Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: SQLExecuteQueryOperator sql_callable #35844

Conversation

raphaelauv
Copy link
Contributor

@raphaelauv raphaelauv commented Nov 24, 2023

how should I manage typing , since mypy fail with

error: Argument "sql" to
"generate_openlineage_metadata_from_sql" of "SQLParser" has incompatible type
"Union[str, List[str], None]"; expected "Union[List[str], str]"  [arg-type]

any elegant idea ? thanks

@Taragolis
Copy link
Contributor

I don't think this required. There is good builtin replacement for exactly the same things already exists for the every operator parameter which supports templated fields: PythonOperator -> SQLExecuteQueryOperator or taskflow -> SQLExecuteQueryOperator

@raphaelauv
Copy link
Contributor Author

the limitation with the current implementation is that the full templated query is not available in airflow UI or logs

->

import os

from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from pendulum import today

from utils_dags import UTILS_SQL_PATH_RELATIVE

with DAG(
        dag_id="test_sql_templating",
        schedule_interval="0 0 * * *",
        start_date=today("UTC").add(days=-1),
        template_searchpath=[UTILS_SQL_PATH_RELATIVE],
):
    SQLExecuteQueryOperator(
        task_id="toto",
        conn_id="airflow_db",
        sql=os.path.join("toto.sql"),
        parameters={"tutu": "{{ ds }}"}
    )

where toto.sql file contain

select  * from job where  dag_id != %(tutu)s

image

also if I want use a function to apply a logic on the toto.sql file ( by exemple wrap the query to create a redshift unload query it's impossible )

@raphaelauv
Copy link
Contributor Author

@Taragolis

thanks for the advise , it work but it mean 2 operators and it's "less friendly"

    def build_sql(date):
        return "select  * from job where  dag_id != '{date}'".format(date=date)


    build_sql_task = PythonOperator(
        task_id="build_sql",
        python_callable=build_sql,
        op_kwargs={"date": "{{ds}}"})

    run_sql_task = SQLExecuteQueryOperator(
        task_id="toto",
        conn_id="airflow_db",
        sql="{{ task_instance.xcom_pull(task_ids='build_sql') }}"
    )
    build_sql_task >> run_sql_task
    build_sql_task.as_setup()
    

if this pattern is considered the preferable solution then I could at least create a PR to add an example , wdyt ?

@raphaelauv raphaelauv force-pushed the feat/sqlexecutequeryoperator_sql_callable branch from cf5dc9e to 122d768 Compare November 25, 2023 00:35
@raphaelauv raphaelauv force-pushed the feat/sqlexecutequeryoperator_sql_callable branch from 122d768 to b8618a3 Compare November 25, 2023 01:23
@eladkal
Copy link
Contributor

eladkal commented Nov 25, 2023

I am not sure the suggestion simplify things. I am worried that it might cause more confusion.
I replied on a similar problem in https://stackoverflow.com/a/72246305/14624409

The reason that I don't think it simplify things is that for most use case users want to render the values in Airflow and not by %(tutu)s like you show. They go this way because they find confusing information about how to template.

also if I want use a function to apply a logic on the toto.sql file ( by exemple wrap the query to create a redshift unload query it's impossible )

Can you share such example with redshift?

@raphaelauv
Copy link
Contributor Author

I would like to render values with airflow but since params can't be templated and I don't want to put airflow templating directly in the .sql file ( for separation of concern and also keep the templating logic only in the dag and do not ducplicate templating logic between .sql and .py file )

so maybe the solution would be to "cheat" and show in the UI and the logs the query after the replace of the parameters values

so it's more friendly

@raphaelauv
Copy link
Contributor Author

raphaelauv commented Nov 26, 2023

the "unload" I do

def unload_sql(sql_select_query: str, s3_export_path: str) -> str:
    sql_select_query = sql_select_query.replace("'", "''")
    return f"""
UNLOAD
('
{sql_select_query}
')
...
...
"""

    def build_sql(date):
        with open(os.path.join(UTILS_SQL_PATH, f"aaaaaaa.sql"), "r") as sql_file:
            s3_select_query = sql_file.read()
            s3_path = "s3://aaaaaaaaaaaaa/{{ ds }}/"
            return unload_sql(s3_select_query, s3_path)

    build_sql_task = PythonOperator(
        task_id="build_sql",
        python_callable=build_sql,
        op_kwargs={"date": "{{ds}}"})

    run_sql_task = SQLExecuteQueryOperator(
        task_id="toto",
        conn_id="airflow_db",
        sql="{{ task_instance.xcom_pull(task_ids='build_sql') }}"
    )
    build_sql_task >> run_sql_task
    build_sql_task.as_setup()

@Taragolis
Copy link
Contributor

There is couple of solution already exists, note this not end of list, I believe there is also exist couple solution, so this list not final

Solution 1:
one of the, is Jinja templates, some complex better to solve with params, d not to be confused with parameters, and files instead of single string.

Solution 2:
As I mentioned XCom via upstream task, usual Python Operator / task flow

Solution 3:
Custom operator

Solution 4:
Task Flow operator + Hook


What I don't like about the PR's proposed solution: limited only for one field in one of operator and there is no arguments / context provided to the callable itself. However if add additional parameters and context there would be no different with python operator. There also was attempt to add this approach to new LLM based providers, however final decision was use builtin Airflow abilities instead of additional 3 parameters for replace one argument.


One benefit with callable in Operator if compare to put/get result via XCom and upstream task, is has less overhead for create new task, which usually depend on Executor type. But in this case I think better resolve it globally rather than on one specific operator. For example it could be some specific type, which would call callable during rendering fields, e.g. same approach as implemented for LiteralValue

@raphaelauv
Copy link
Contributor Author

raphaelauv commented Nov 26, 2023

I agree that a general solution for all operators is better than adding the feature to that specific operator.

So the main question that we have to agree on what is the recommended pattern :

  1. pythonOperator before the main operator

Or

  1. we add to the baseoperator the possibility to add a callable to custom the templating logic

If we chose number 1 than I will add documentation about it cause it's not obvious to correctly do it for everybody

@potiuk
Copy link
Member

potiuk commented Dec 11, 2023

  1. we add to the baseoperator the possibility to add a callable to custom the templating logic

I would very much prefer this (main reason is that PythonOperator -> Other Operator is complex to write, maintain , introduces completely unnecessary task in the DAG and is way slower - especially in case of K8S executor.

I think it's just wasteful (resource-wise) to use PythonOperator just to run a calllable to feed another operator via xcom.

Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Jan 26, 2024
@github-actions github-actions bot removed the stale Stale PRs per the .github/workflows/stale.yml policy file label Jan 27, 2024
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Mar 13, 2024
@raphaelauv raphaelauv closed this Mar 16, 2024
@Taragolis
Copy link
Contributor

I would very much prefer this (main reason is that PythonOperator -> Other Operator is complex to write, maintain , introduces completely unnecessary task in the DAG and is way slower - especially in case of K8S executor.

Necroposting

There is not necessarily to be a part of BaseOperator, it could be a subclass of ResolveMixin by the same way it is implemented for LiteralValue

@raphaelauv raphaelauv deleted the feat/sqlexecutequeryoperator_sql_callable branch May 30, 2024 15:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers provider:common-sql stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants