Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(flink): temporal join support #8412

Conversation

mfatihaktas
Copy link
Contributor

@mfatihaktas mfatihaktas commented Feb 21, 2024

Description of changes

Explorative PR towards addressing #8247.

This is my first time getting hands-on with sqlglot changes :) So, particularly seeking suggestions on

  • Temporal join API
  • The newly added op VersionedDatabaseTable and expression TemporalJoin.

Note: Added support only in the Flink backend for now. Did not spend time on how to error out when the user calls tempora_join() on a backend that does not support temporal join. Plan to address these once we reach an agreement on the API.

Issues closed

@mfatihaktas mfatihaktas force-pushed the feat/temporal-join-after-sqlglot branch from 314533b to 47ef46a Compare February 23, 2024 21:15
# presence of other use cases.
# - Implement a way to turn off aliasing for temporal joins.
# - This also seems to require additional tech debt.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kszucs I documented an issue that I could not find a solution for as a long comment in visit_VersionedDatabaseTable(). To summarize, I am not able to get at_time compile with the right alias due to tables getting copied internally. By the time, SQL is generated for the nodes, at_time is bound to a "stale" reference of the corresponding table. This leads to generating an incorrect SQL for at_time, e.g., t0.timestamp_col while it is supposed to be t2.timestamp_col. This issue arises in both the initial API and the one you suggested.

I failed resolving/working around this in my initial attempts (though learned quite a bit about the compilation process in Ibis) but will continue looking into this. In the comment, I noted the two approaches that came to my mind but both are quite tedious and likely to be unnecessary. I feel that I am missing something obvious due to my lack of sufficient knowledge of the code base :) Do you have any suggestion for me?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the source code more, I might be wrong about my judgement of the table copies "spoiling" the table references. The table reference disconnect for at_time seems to be originating from the JoinTable's constructed off of DatabaseTable -- see the copied IR graph. Will update here once I get a better understanding.

image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to wrap the join participant tables in JoinTable relations, so that the same table can be added to the join several times. This way we ensure that all tables in the join are distinct and the predicates reference distinct tables.

This has the drawback that all value expressions referencing the wrapped table, must be replaced with the JoinTable, like Field(r) => Field(JoinTable(r)). We call this "dereferencing" and handled by prepare_predicates().
This means that both table_left and table_right should only be connected to the respective JoinTable nodes, but there is an exception in the graph, the timestamp_col which indicates that it is not getting dereferenced to the join tables. Most certainly the problem is with the at_time here:

link = ops.TemporalJoinLink(how="temporal", table=right, at_time=at_time, predicates=preds)

before passing to the join link at_time should be dereferenced:

deref_left = dereference_mapping_left(chain)
at_time = at_time.replace(deref_left, filter=ops.Value)

Let me know whether that helps or not, then we can iterate with this PR. In the meantime I am trying to have a more detailed look at flink temporal joins.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @kszucs for the detailed explanation. Reading the source code a bit more, I was able to figure that the main reason is indeed the wrapping of participant tables in JoinTable. For the initial API, I was able to dereference at_time as you explained, but using a much uglier hack 😄:

    def rebase_at_time_on_join_table(self, at_time: Column):
        """This is a "hack" only to demonstrate the challenge with generating
        the correct table alias for `at_time`. Shall be refactored out.

        Creates and returns a new `at_time` such that
        * New `at_time` is created from JoinTable `join_table`
        * `join_table` got created in `relations.py: temporal_join()` from DatabaseTable
        `database_table`.
        * The arg `at_time` got created from `database_table` by the user in the first
        place.

        """
        from ibis.common.graph import Graph

        at_time_source_table = at_time.op().find(finder=ops.DatabaseTable)[0]

        op = self.op()
        graph, dependents = Graph.from_bfs(op).toposort()
        at_time_join_table_op = dependents[at_time_source_table][0]
        at_time_join_table = Table(at_time_join_table_op)
        return at_time_join_table[f"{at_time.op().name}"]

I will replace this with your suggestion and share the update here.

For the second API (initiated by your review), I was not able to dereference at_time successfully -- my dereferencing "hack" messed up the dereferencing done for the predicates. I will try doing it with dereference_mapping_left() and replace(), and will share the update here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kszucs I was able to use dereference_mapping_left() and replace() to "rebase" the at_time field on the left JoinTable. I added TODO comments for things that I am not certain about.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Going to have another look hopefully tomorrow.

@mfatihaktas mfatihaktas requested a review from kszucs February 23, 2024 21:27
@mfatihaktas mfatihaktas self-assigned this Feb 26, 2024
@mfatihaktas mfatihaktas added the flink Issues or PRs related to Flink label Feb 26, 2024
@mfatihaktas mfatihaktas force-pushed the feat/temporal-join-after-sqlglot branch 2 times, most recently from bea7747 to 00cd6cd Compare February 26, 2024 20:49
@mfatihaktas mfatihaktas force-pushed the feat/temporal-join-after-sqlglot branch from 00cd6cd to 01bacb2 Compare February 26, 2024 23:21
"Right-table is not versioned. "
"Temporal join is defined only when the right-table is versioned."
)
elif at_time is None and right.at_time is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just a convenience to support defining at_time for both the table and the join operation?

What I am thinking is to restrict the API here so that the ops.VersionedDatabaseTable's at_time attribute become mandatory and we remove the at_time argument here, something like:

ordinary = con.table("my_ordinary_table")
versioned = con.table("my_versioned_table")

ordinary.temporal_join(
    versioned.at_time(ordinary.timestamp_col),
    [...additional predicates...]
)

That way we could remove the ops.TemporalJoinLink node as well, since it wouldn't hold an extra at_time property since that is always defined on the right table (how would carry the information about the join kind).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the nicest would probably be your example from #8247:

expr = table_left.join(
    table_right.at_time(table_left.event_time_attribute),
    table_left.on_field == table_right.on_field
)

If the right table passed to .join() is a versioned table, then the join must be a temporal join.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. As you raised this earlier, I also like this API (API-2) more compared to the first one (API-1) requiring the at_time argument in temporal_join(). I have kept both API's so far in case one would require a more complex implementation than the other and that might influence our opinion about which one to pick. This did not turn out to be the case, so I will remove the option with at_time argument.

If the right table passed to .join() is a versioned table, then the join must be a temporal join.

This is a good idea, will make this change.

There are, however, a few things that are currently not ideal with the implementation of API-2. I documented these items in TODO's. Here is a summary of them:

  • The current implementation requires table_right.at_time(...) to be on a separate line as
        table_right = table_right.at_time(table_left.timestamp_col)
        expr = table_left.temporal_join(
            table_right,
            predicates=[
                table_left["id"] == table_right["id"],
            ],
        )

If we do not over-write table_right as given above, i.e.,

        expr = table_left.temporal_join(
            table_right.at_time(table_left.timestamp_col),
            predicates=[
                table_left["id"] == table_right["id"],
            ],
        )

we will get

        E ibis.common.exceptions.IntegrityError: Cannot add <ibis.expr.operations.logical.Equals object at 0x17c374270> to projection, they belong to another relation

This is because at_time() returns a new copy of table_right and the predicates are defined with the old copy of table_right.

  • On removing ops.TemporalJoinLink:

That way we could remove the ops.TemporalJoinLink node as well, since it wouldn't hold an extra at_time property since that is always defined on the right table (how would carry the information about the join kind).

I tried to implement this exact idea previously, and access at_time through join_op -> table_right_op -> at_time_op -- join_sql() in dialects.py is actually a deprecated-leftover from this effort and kept it for reference during the reviews.
We first need to rebase table_right.at_time on join_table_right with

        # Rebase `at_time` on the JoinTable wrapping up the DatabaseTable
        # `at_time` field is based on.
        deref_left = dereference_mapping_left(left)
        at_time = at_time_op.replace(deref_left, filter=ops.Value)

Then to keep a reference to the rebased at_time, we need to make it an attribute of right. The only way that I could come up to do this was

right = right.op().copy(at_time=at_time)

However, this lead to the same error given above

        E ibis.common.exceptions.IntegrityError: Cannot add <ibis.expr.operations.logical.Equals object at 0x17c374270> to projection, they belong to another relation

This is again because now the (old) right table referenced in the predicate is different than the new right table. As far as I understand it, to get this working, we would need to have an additional step where we update the relevant predicates with the new right before calling prepare_predicates().

@kszucs Do you have suggestions or ideas on addressing these two points?

Copy link
Contributor Author

@mfatihaktas mfatihaktas Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just "discovered" that Flink supports two different kinds of temporal join with LEFT and INNER. I did not read this in the docs, just found out by trying it out. With this, how in TemporalJoinLink would be set to either left or inner. This might be another reason to keep TemporalJoinLink rather than "encoding" the join type in JoinLink as temporal-left or temporal-inner.

@kszucs
Copy link
Member

kszucs commented Feb 27, 2024

Is there another ibis backend which supports this kind of temporal join? I would like to avoid introducing global APIs highly tailored towards certain backends since it would defeat the purpose of ibis. Having it implemented for another backend could validate the API and IR design we have here.

@kszucs
Copy link
Member

kszucs commented Feb 27, 2024

Just had a closer look at the issue description which you have nicely collected the necessary information in, apparently the options are RisingWave and MySQL. How much work would it be to support risingwave here?

@@ -330,6 +340,13 @@ class DatabaseTable(PhysicalTable):
namespace: Namespace = Namespace()


@public
class VersionedDatabaseTable(DatabaseTable):
Copy link
Member

@kszucs kszucs Feb 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can other table-like objects be versioned? I'm thinking of views or generic SELECT queries here. If yes, it might be better to have a TableSnapshot relation instead referencing an arbitrary parent relation, like:

class TableSnapshot(Relation):
    parent: Relation  # this would be an ops.DatabaseTable in most of the cases
    at_time: Column[dt.Timestamp]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we already discussed this, and the answer was no. Even if it turns out to be wrong, it should be addressed in a follow up PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my understanding, intermediate tables in Flink such as SELECT queries can be versioned and queried only with Paimon in place. Without Paimon, Flink supports versioned views, against which temporal join can be performed.

@mfatihaktas
Copy link
Contributor Author

Is there another ibis backend which supports this kind of temporal join? I would like to avoid introducing global APIs highly tailored towards certain backends since it would defeat the purpose of ibis. Having it implemented for another backend could validate the API and IR design we have here.

Just had a closer look at the issue description which you have nicely collected the necessary information in, apparently the options are RisingWave and MySQL. How much work would it be to support risingwave here?

Adding support for RisingWave would have been great, but RisingWave seems to support only processing-time temporal join. This PR adds support for only event-time temporal join. In event-time temporal join, rows are joined with their right-table versions specified by a given time-attribute (at_time in this PR). In processing-time temporal join, rows are always joined with their most-recent version in the right-table.

For MySQL, I could not find enough information on whether it supports temporal join or not. At the time, I could find only this article that discusses support for performing joins against temporal tables, which does not have the same semantics as Flink's temporal join. In my understanding, it is for joining against a particular version of the table that is specified by a given timestamp rather than a time attribute. So it is like first performing a time travel on the table(s) and then joining them.

@cpcloud
Copy link
Member

cpcloud commented Sep 30, 2024

Closing, as I don't think I can maintain this code at the moment. We can always revisit later!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
flink Issues or PRs related to Flink
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants