-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Data] Add read_clickhouse API to read ClickHouse Dataset #48817
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: Connor Sanders <[email protected]>
Signed-off-by: Connor Sanders <[email protected]>
Signed-off-by: Connor Sanders <[email protected]>
Signed-off-by: Connor Sanders <[email protected]>
Signed-off-by: Connor Sanders <[email protected]>
columns: Optional[List[str]] = None, | ||
filters: Optional[Dict[str, Tuple[str, Any]]] = None, | ||
order_by: Optional[Tuple[List[str], bool]] = None, | ||
client_settings: Optional[Dict[str, Any]] = None, | ||
client_kwargs: Optional[Dict[str, Any]] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make all optional args as kwargs
entity: str, | ||
dsn: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please give an example of the DSN?
|
||
def __init__( | ||
self, | ||
entity: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I'd suggest we employ more common term like table
(and in py-doc expand that this could also be a view of one)
filters: Optional fields and values mapping to use to filter the data via | ||
WHERE clause. The value should be a tuple where the first element is | ||
one of ('is', 'not', 'less', 'greater') and the second | ||
element is the value to filter by. The default operator | ||
is 'is'. Only strings, ints, floats, booleans, | ||
and None are allowed as values. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC this is requiring predicate in DNF format, let's call it out explicitly and add an example to help with understanding of it.
Also let's add a link to the page of parameters to ClickHouse explaining these in more details
f"Unsupported operator '{op}' for filter on '{column}'. " | ||
f"Defaulting to 'is'" | ||
) | ||
op = "is" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as below
if value is None: | ||
operator = validate_non_numeric_ops(key, operator) | ||
if operator == "is": | ||
filter_conditions.append(f"{key} IS NULL") | ||
elif operator == "not": | ||
filter_conditions.append(f"{key} IS NOT NULL") | ||
elif isinstance(value, str): | ||
operator = validate_non_numeric_ops(key, operator) | ||
filter_conditions.append(f"{key} {ops[operator]} '{value}'") | ||
elif isinstance(value, bool): | ||
operator = validate_non_numeric_ops(key, operator) | ||
filter_conditions.append( | ||
f"{key} {ops[operator]} {str(value).lower()}" | ||
) | ||
elif isinstance(value, (int, float)): | ||
filter_conditions.append(f"{key} {ops[operator]} {value}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's split up value conversion from filter composition to avoid duplication
op = "is" | ||
return op | ||
|
||
ops = {"is": "=", "not": "!=", "less": "<", "greater": ">"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's use Python operators so that we're not reinventing the wheel here
# Fetch the fragments from the ClickHouse client | ||
with self._client.query_arrow_stream(self._query) as stream: | ||
record_batches = list(stream) # Collect all record batches |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So actual reading of the data needs to be performed inside the read task (that's currently consolidated in _read_fn
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd also recommend you to take a look at SQLDatasource
to see how it could be structured in a way compatible with Ray read APIs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see what you mean. I'll make that change. Thank you for pointing that out!
Co-authored-by: Alexey Kudinkin <[email protected]> Signed-off-by: Connor Sanders <[email protected]>
Co-authored-by: Alexey Kudinkin <[email protected]> Signed-off-by: Connor Sanders <[email protected]>
Co-authored-by: Alexey Kudinkin <[email protected]> Signed-off-by: Connor Sanders <[email protected]>
Why are these changes needed?
Greetings from Elastiflow!
This PR introduces a new ClickHouseDatasource connector for Ray, which provides a convenient way to read data from ClickHouse into Ray Datasets. The ClickHouseDatasource is particularly useful for users who are working with large datasets stored in ClickHouse and want to leverage Ray's distributed computing capabilities for AI and ML use-cases. We found this functionality useful while evaluating ML technologies and wanted to contribute this back.
Key Features and Benefits:
Tested locally with a ClickHouse table containing ~12m records.
Related issue number
Checks
git commit -s
) in this PR.scripts/format.sh
to lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/
under thecorresponding
.rst
file.