diff --git a/airflow/sensors/sql.py b/airflow/sensors/sql.py index 51882b74f63cf..2708f633e3af9 100644 --- a/airflow/sensors/sql.py +++ b/airflow/sensors/sql.py @@ -48,6 +48,9 @@ class SqlSensor(BaseSensorOperator): :type failure: Optional :param fail_on_empty: Explicitly fail on no rows returned. :type fail_on_empty: bool + :param hook_params: Extra config params to be passed to the underlying hook. + Should match the desired hook constructor params. + :type hook_params: dict """ template_fields: Iterable[str] = ('sql',) @@ -58,7 +61,16 @@ class SqlSensor(BaseSensorOperator): ui_color = '#7c7287' def __init__( - self, *, conn_id, sql, parameters=None, success=None, failure=None, fail_on_empty=False, **kwargs + self, + *, + conn_id, + sql, + parameters=None, + success=None, + failure=None, + fail_on_empty=False, + hook_params=None, + **kwargs, ): self.conn_id = conn_id self.sql = sql @@ -66,6 +78,7 @@ def __init__( self.success = success self.failure = failure self.fail_on_empty = fail_on_empty + self.hook_params = hook_params super().__init__(**kwargs) def _get_hook(self): @@ -90,7 +103,7 @@ def _get_hook(self): f"Connection type ({conn.conn_type}) is not supported by SqlSensor. " + f"Supported connection types: {list(allowed_conn_type)}" ) - return conn.get_hook() + return conn.get_hook(hook_kwargs=self.hook_params) def poke(self, context): hook = self._get_hook() diff --git a/tests/sensors/test_sql_sensor.py b/tests/sensors/test_sql_sensor.py index fca387b1748e3..92ec1c7744888 100644 --- a/tests/sensors/test_sql_sensor.py +++ b/tests/sensors/test_sql_sensor.py @@ -253,3 +253,15 @@ def test_sql_sensor_presto(self): dag=self.dag, ) op.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + def test_sql_sensor_hook_params(self): + op = SqlSensor( + task_id='sql_sensor_hook_params', + conn_id='google_cloud_default', + sql="SELECT 1", + hook_params={ + 'delegate_to': 'me', + }, + ) + hook = op._get_hook() + assert hook.delegate_to == 'me'