Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SqlSensor with Standard SQL BigQuery dialect (allow to pass config to injected (DI) hooks) #17315

Closed
dinigo opened this issue Jul 29, 2021 · 2 comments
Labels
duplicate Issue that is duplicated kind:bug This is a clearly a bug

Comments

@dinigo
Copy link
Contributor

dinigo commented Jul 29, 2021

Apache Airflow version: 1.10.15, but happens in all of them

Environment: GCP Composer -> Kubernetes + Celery

  • Cloud provider or hardware configuration: Google

What happened:
Errors appear while trying to use SqlSensor against a BigQuery View.
For context I'll explain a little bit: BigQuery seemingly executes only SQL, but in reality there are two versions of the language. For backwards compatibility the so called legacySQL is set by default.

I'm hot-fixing it like so:

class BigQuerySqlSensor(SqlSensor):
    """
    Overwrites hook config when using SqlSensor with a BigQuery
    connection
    """
    def _get_hook(self):
        hook = super()._get_hook()
        hook.use_legacy_sql = False
        hook.location = 'europe-west1'
        return hook

How to reproduce it:
The issue I'm experiencing is trying to run a simple query agains a view

check_resource_has_data = SqlSensor(
    task_id="check_resource_has_data",
    conn_id='google_cloud_default',
    sql=f'SELECT COUNT(*) > 0 FROM `{dataset_id}.{resource_id}`',
)

First Error: As I am using a standard language but the hook automatically sets this to legacy BigQuery will try to run this as Legacy, fail to parse and return an erro

[2021-07-28 16:03:05,499] {taskinstance.py:1152} ERROR - BigQuery job failed. Final error was: {'reason': 'invalid', 'location': '`my-project.test.cashflow_abacus_view`', 'message': 'Invalid table name: `my-project.test.my_view`
[Try using standard SQL (https://cloud.google.com/bigquery/docs/reference/standard-sql/enabling-standard-sql)].'}. The job was: {'kind': 'bigquery#job', 'etag': 'random_etag==', 'id': 'my-project:US.job_some_randome_id_123412', 'selfLink': 'https://bigquery.googleapis.com/bigquery/v2/projects/my-project/jobs/job_some_randome_id_123412?location=US', 'user_email': '[email protected]', 'configuration': {'query': {'query': '
                    SELECT COUNT(*) > 0
                    FROM `my-project.test.my_view`
                ', 'priority': 'INTERACTIVE', 'useLegacySql': True}, 
                'jobType': 'QUERY'}, 'jobReference': {'projectId': 
                'my-project', 'jobId': 'job_some_randome_id_123412', 'location': 'SO'}, 'statistics': {'creationTime': '1627488185084', 'startTime': '1627488185109', 'endTime': '1627488185109'}, 'status': {'errorResult': {'reason': 'invalid', 'location': '`my-project.test.my_view`', 'message': 'Invalid table name: `my-project.test.my_view`
[Try using standard SQL (https://cloud.google.com/bigquery/docs/reference/standard-sql/enabling-standard-sql)].'}, 'errors': [{'reason': 'invalid', 'location': '`my-project.test.my_view`', 'message': 'Invalid table name: `my-project.test.my_view`
[Try using standard SQL (https://cloud.google.com/bigquery/docs/reference/standard-sql/enabling-standard-sql)].'}], 'state': 'DONE'}}

Second Error: In order to switch the "dialect" you have several options. One of them, the closest to the code is prepending the query with a comment line like so # standardSQL (docs). But

What you expected to happen:

There's no way to pass information to the hock that the SqlSensor is using underneath. It would be interesting to add a hook_params to the SqlSensor to be able to config the underlaying hooks:

# airflow/sensors/sql.py
class SqlSensor(BaseSensorOperator):
    def __init__(
        self, *, conn_id, sql, hook_config: Dict, parameters=None, success=None, failure=None, fail_on_empty=False, **kwargs
    ):
        self.conn_id = conn_id
        # init all the params...
        self.hook_config = hook_config or {}
        super().__init__(**kwargs)

    def _get_hook(self):
        conn = BaseHook.get_connection(self.conn_id)
        # ...
        return conn.get_hook(**self.hook_config)

And in the connection

# airflow/models/connection.py
class Connection(Base, LoggingMixin):
    # ...
    def get_hook(self, **kwargs):
        """Return hook based on conn_type."""
        # locate hook class ...
        return hook_class(**{conn_id_param: self.conn_id}, **kwargs)
@dinigo dinigo added the kind:bug This is a clearly a bug label Jul 29, 2021
@dinigo dinigo changed the title SqlSensor with Standard SQL Bigquery query SqlSensor with Standard SQL BigQuery dialect (allow to pass config to injected (DI) hooks) Jul 29, 2021
@eladkal
Copy link
Contributor

eladkal commented Jul 29, 2021

duplicate of #13750
You are welcome to open a PR adding the requested functionality :)

@eladkal eladkal closed this as completed Jul 29, 2021
@eladkal eladkal added the duplicate Issue that is duplicated label Jul 29, 2021
@dinigo
Copy link
Contributor Author

dinigo commented Jul 29, 2021

You are right! I was naively thinking that maybe the rest of DbApiHooks would be using "magic" constants. But not. Only Google does this sort of stuff. I'll close the issue and, if I find some time, I'll open a PR agains the one you mentioned.

Thanks @eladkal !!!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
duplicate Issue that is duplicated kind:bug This is a clearly a bug
Projects
None yet
Development

No branches or pull requests

2 participants