Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding ClickHouse Provider #25714

Closed
wants to merge 19 commits into from
Closed

Adding ClickHouse Provider #25714

wants to merge 19 commits into from

Conversation

pateash
Copy link
Contributor

@pateash pateash commented Aug 14, 2022

closes: #10893


Description

Adding ClickHouse provider based on its Python SDK https://clickhouse-driver.readthedocs.io/en/latest/

Users can create their own custom operators leveraging the ClickHouseHook directly
or building their operator on ClickHouseOperator by providing result_processor method,

operator = ClickHouseOperator(
    task_id='clickhouse_operator',
    sql="SELECT * FROM gettingstarted.clickstream",
    dag=dag,
    result_processor=lambda cursor: print(cursor)
)

The sensor can be implemented by SQL

sensor = ClickHouseSensor(
    task_id="clickhouse_sensor",
    sql="SELECT * FROM gettingstarted.clickstream where customer_id='customer1'",
    timeout=60,
    poke_interval=10,
    dag=dag,
)

@pateash
Copy link
Contributor Author

pateash commented Aug 14, 2022

image

@pateash
Copy link
Contributor Author

pateash commented Aug 14, 2022

Testing providers with sql and template files(*_tf)

image

@pateash
Copy link
Contributor Author

pateash commented Aug 14, 2022

image

from airflow.utils.context import Context


class ClickHouseOperator(BaseOperator):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ClickHouseQueryOperator ?

also it could maybe be base on the https://pypi.org/project/apache-airflow-providers-common-sql/ ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its similar

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is similar ?

Copy link
Contributor

@eladkal eladkal Aug 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pateash can you elaborate on this point?
I think ClickHouseHook should inherit from DbApiHook and ClickHouseOperator should inherit from BaseSQLOperator

Is there a reason why we shouldn't do it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HI @eladkal, @raphaelauv ,
Sorry for the late response, I was OOO.

the reason I didn't used DBApiHook is because it's intended to be used with databases ( mostly transactional supporting sqlalchemy ),

and Clickhouse being a distributed OLAP, i really think that it will be better not to expose methods like insert_rows(), which uses sqlalchemy.
rather if someone wants to have this functionality they should use the underlying library ( clickhouse-driver ) and implement their own operator using the hook and connection object.

@pateash pateash requested a review from raphaelauv August 17, 2022 15:44
Comment on lines +68 to +72
# if database is provided use it or use from schema
if self.database:
connection_kwargs.update(database=self.database)
elif conn.schema:
connection_kwargs.update(database=conn.schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a source for confusion? Maybe we should customize the connection in the like we do with other providers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please provide more information?

here, if someone wants to override the default schema ( provided by connection ), they can pass the database argument.


result = hook.query(sql=self.sql, params=self.params)
if self.result_processor:
self.result_processor(result)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is result_processor implemented?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result process is a callback which could be passed by the user to proess the result similar to arangodb provider.

from airflow.sensors.sql import SqlSensor


class ClickHouseSensor(SqlSensor):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked it elsewhere but if ClickHouseHook will inhert from DBApiHook then we won't need this custom sensor because SqlSensor will support ClickHouse natively https://github.com/pateash/airflow/blob/f7e2ffe42bb7e957e4e16d0cb65e9541c87bd72b/airflow/providers/common/sql/sensors/sql.py#L80

Copy link
Contributor Author

@pateash pateash Sep 22, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HI @eladkal,
Sorry for the late response, I was OOO.

the reason I didn't used DBApiHook is because it's intended to be used with databases ( mostly transactional supporting sqlalchemy ),

and Clickhouse being a OLAP, i really think that it will be better not to expose methods like insert_rows(), which uses sqlalchemy.
rather if someone wants to have this functionality they should use the underlying library ( clickhouse-driver ) and implement their own operator using the hook and connection object.

@pateash pateash requested review from eladkal and raphaelauv and removed request for potiuk, kaxil, raphaelauv, mik-laj and eladkal September 22, 2022 13:57
@sachebotarev
Copy link

sachebotarev commented Oct 29, 2022

Hi @pateash

I'm interested in having airflow support ClickHouse, but I don't want to have such a standard implementation.

  1. You realisation very similar Anton Bryzgalov's rather old code https://github.com/bryzgaloff/airflow-clickhouse-plugin. His code is suitable for plugins, but it looks strange in the provider
  2. I'm very confused by calling self.get_conn() in a constructor. The connection is not closed anywhere.
  3. clickhouse-driver supports DB API 2.0 so inheriting from DbApiHook and ClickHouseOperator will provide many out of the box solutions. insert_rows( ) can be overridden with Cursor.executemany or just rise exception.
cursor.executemany('INSERT INTO test (x) VALUES', [[200]])
  1. Your ClickHouseHook provides very limited functionality, compare for example with ExasolHook (also an analytical database).
  2. Your ClickHouseOperator does not provide the same capabilities (last result, many requests in template, xcom ).
    I would get these features (and more) just using SQLExecuteQueryOperator

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.

@github-actions github-actions bot added the stale Stale PRs per the .github/workflows/stale.yml policy file label Dec 14, 2022
@github-actions github-actions bot closed this Dec 20, 2022
@vargacypher vargacypher mentioned this pull request Apr 20, 2024
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:dev-tools area:providers kind:documentation stale Stale PRs per the .github/workflows/stale.yml policy file
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add new operator for ClickHouse
4 participants