Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Standard SQL in BigQuery Sensor #13750

Closed
omarismail94 opened this issue Jan 18, 2021 · 12 comments · Fixed by #18431
Closed

Support Standard SQL in BigQuery Sensor #13750

omarismail94 opened this issue Jan 18, 2021 · 12 comments · Fixed by #18431
Assignees
Labels
good first issue kind:feature Feature Requests provider:google Google (including GCP) related issues

Comments

@omarismail94
Copy link

Description

A sql sensor which uses Standard SQL due to default one uses legacy sql

Use case / motivation

Currently (correct me if I am wrong!), the sql sensor only supports legacy sql. If I want to poke a BQ table, I do not think I can do that using standard sql right now.

Are you willing to submit a PR?

If community approves of this idea, sure!

@omarismail94 omarismail94 added the kind:feature Feature Requests label Jan 18, 2021
@boring-cyborg
Copy link

boring-cyborg bot commented Jan 18, 2021

Thanks for opening your first issue here! Be sure to follow the issue template!

@mik-laj mik-laj added the provider:google Google (including GCP) related issues label Jan 19, 2021
@mik-laj mik-laj changed the title Standard SQL sensor Support Standard SQL in BigQuery Sensor Jan 19, 2021
@mik-laj
Copy link
Member

mik-laj commented Jan 19, 2021

Can you provide an example of the actual usage? What sesnors are you talking about?

@eladkal
Copy link
Contributor

eladkal commented Jan 19, 2021

Isn't SqlSeneor compatible with BigQuery?

@sb7f9
Copy link

sb7f9 commented Jan 25, 2021

@eladkal standard SqlSensor is not compatible with BigQuery due to by default it is using legacy sql which does not support many of the standard sql such as (querying partition tables etc..) There is no direct parameter like "use_legacy_sql" = False available that could overwrite the setting to enforce to use standard sql (BigQueryOperator has this functionality).

@omarismail94
Copy link
Author

@mik-laj Yeah, you have to do something like this:

class BigQuerySqlSensor(BaseSensorOperator):
    template_fields = ('sql',)
    template_ext= ('.sql',)


    @apply_defaults
    def __init__(
        self, 
        bigquery_conn_id = 'bigquery_conn_id', 
        delegate_to=None,
        location='US',
        sql=None, 
        use_legacy_sql=False, 
        **kwargs):

        self.bigquery_conn_id = bigquery_conn_id
        self.delegate_to = delegate_to
        self.location = location
        self.sql = sql
        self.use_legacy_sql = use_legacy_sql
        super().__init__(**kwargs)

    def poke(self, context):
        hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, delegate_to=self.delegate_to, location=self.location, use_legacy_sql= self.use_legacy_sql)
        connection = hook.get_conn()
        cursor = connection.cursor()
        cursor.execute(self.sql)

        for row in cursor.fetchall():
            self.log.info("printing rows ...")
            self.log.info(row)
            row_count = row[0]
            self.log.info("rows printed.")

            if row_count > 0:
                return True

        return False

@lewis-anderson53
Copy link

I struggled with this a while ago, I ended up creating a custom sensor specifically using the BigQuery hook that allows me to specify the parameters, very similar to the code that @omarismail94 shared.

The "easy" option would be to change the default value of the BigQueryHook for use_legacy_sql to False but I would guess a change like that needs some consideration.
I think there is justification for this: legacy SQL hasn't been the default in Google BigQuery since 2016. I'd expect 5 years on, the vast majority of BigQuery projects now use standard SQL.

If a change like that isn't viable, perhaps we could look into extending the behaviour of the Connection.get_hook() method, which is called in the SqlSensor to get the hook? Right now it just returns a hook with the default params, perhaps this could be more flexible and allow the instantiation of the SqlSensor to specify custom parameter which is then transferred onto the returned hook.

e.g.

my_sensor = SqlSensor(
    ...
    hook_parameters = Dict(use_legacy_sql = False),
    ...
)

@dinigo
Copy link
Contributor

dinigo commented Jul 29, 2021

In cae someone comes around looking for a solution for this. I solved it with this quick fix

class BigQuerySqlSensor(SqlSensor):
    """ Overwrites the use_legacy_sql when using SqlSensor with a BigQuery connection"""
    def _get_hook(self):
        hook = super()._get_hook()
        hook.use_legacy_sql = False
        hook.location = Variable.get('location')
        return hook

sense_data = BigQuerySqlSensor(
    task_id="sense_data",
    conn_id="google_default",
    sql="select count(*) > 0 from `my_dataset.my_view`",
)

Apart from this quick fix I see the point on defaulting to the "legacy" dialect to keep backwards compatibility. I see two solutions for this issue:

  1. Expose and read the dialect version from the connection extras
  2. Set the flag dynamically from the sql code if it starts with #starndardSQL like the docs mention

Both of them can be implemented. Any thoughts on what could be preferable?

@uranusjr
Copy link
Member

We should probably support passing arguments from a sensor to the underlying hook. Something like this?

sense_data = SqlSensor(
    task_id="sense_data",
    conn_id="google_default",
    hook_kwargs={"use_legacy_sql": False},
    sql="select count(*) > 0 from `my_dataset.my_view`",
)

@lewis-anderson53
Copy link

I too quite like the hook_kwargs option, as it would solve other db hook default issues too, not just BigQuery one.

@dinigo
Copy link
Contributor

dinigo commented Aug 5, 2021

Ok, that's another option. It was suggested here with an example piece of code #17315

# airflow/sensors/sql.py
class SqlSensor(BaseSensorOperator):
    def __init__(
        self, *, conn_id, sql, hook_kwargs: Dict, parameters=None, success=None, failure=None, fail_on_empty=False, **kwargs
    ):
        self.conn_id = conn_id
        # init all the params...
        self.hook_kwargs = hook_kwargs or {}
        super().__init__(**kwargs)

    def _get_hook(self):
        conn = BaseHook.get_connection(self.conn_id)
        # ...
        return conn.get_hook(**self.hook_config)

And in the connection

# airflow/models/connection.py
class Connection(Base, LoggingMixin):
    # ...
    def get_hook(self, **kwargs):
        """Return hook based on conn_type."""
        # locate hook class ...
        return hook_class(**{conn_id_param: self.conn_id}, **kwargs)

Then we can implement the code @uranusjr provides

sense_data = SqlSensor(
    task_id="sense_data",
    conn_id="google_default",
    hook_kwargs={"use_legacy_sql": False},
    sql="select count(*) > 0 from `my_dataset.my_view`",
)

@uranusjr
Copy link
Member

Anyone fancy a pull request? Sounds not too complicated to me!

@dinigo
Copy link
Contributor

dinigo commented Aug 12, 2021

WIP, have something, just struggling with the tests. I'll open it so we can review "collectively"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
good first issue kind:feature Feature Requests provider:google Google (including GCP) related issues
Projects
None yet
8 participants