Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add INSERT OVERWRITE to Trino SQL #11602

Closed
georgewfisher opened this issue Mar 21, 2022 · 10 comments
Closed

Add INSERT OVERWRITE to Trino SQL #11602

georgewfisher opened this issue Mar 21, 2022 · 10 comments

Comments

@georgewfisher
Copy link

georgewfisher commented Mar 21, 2022

Add INSERT OVERWRITE to Trino SQL to allow connectors to provide overwrite functionality without the need for session parameters.

Current functionality: example multi-stage query:

  1. SET SESSION hive.insert_existing_partitions_behavior='OVERWRITE';
  2. INSERT INTO hive.test2.insert_test SELECT * FROM tpch.sf1.customer;

Can instead be written like this:

  1. INSERT OVERWRITE hive.test2.insert_test SELECT * FROM tpch.sf1.customer;

Pull request here proposing implementation: #11603

Justifications:

  • Allow for insert overwrite to be used by clients who do not have access to session properties
  • Simplify insert overwrite to query writers who do not find session parameters easy to use or remember
  • Standardize insert overwrite across all connectors in Trino
  • Uses syntax that is common to Spark and Hive

Background

I have been working with Trino and Presto in large production environments and “insert overwrite” has been a consistent ask from users. Trino has proven to be an extremely flexible data access layer that democratizes data consumption and transformation, but the ability to easily update snapshots of data has been a common use case and such needs exists for it users pick other platforms to solve their scenarios. The context I want to emphasize: the users of Trino tend to be a broad mix of skill levels and familiarity with SQL and the simplicity of INSERT OVERWRITE is popular in Spark and would also likely be popular in Trino.

Real World Scenario

A scenario simplified by the ability to update a table easily is the materialization of metrics. Often a live query will power a chart, but the team will either want to persist results on a cadence so they have a historical record, or to precompute/materialize the results so that they can be loaded more quickly.
The scenario:

  1. Customer has a query that reads from multiple catalogs of different kinds
  2. Customer wants to write data to a Hive table
  3. Customer wants to be able to:
    a. Replace all data with latest results
    b. Update data when a result is incorrect

Reasons for Preferring Insert Overwrite Verses Session Properties

The solution to this problem today is to set the overwrite session property (insert_existing_partitions_behavior) on the Hive catalog containing the target table.
When trying to set the session property, these are the issues that come up:

  1. The customer is using a client that does not fully implement session properties
  2. The customer can only make one call to Trino to do the insert
  3. The customer cannot remember the name of the session property (and most of them do not use the CLI, they use visualization platforms)
  4. The customer is focused on business analytics, not data engineering, and does not want to set a session property
  5. The customer thinks overwriting is not supported, goes to Spark and sends us a message: "how can I translate this query into Spark SQL?"

These broad set of issues are real operational pain points because they can lead to problematic solutions such as auto setting overwrite for specific user roles or sources.

Expected Points of Discussion

Response: The objective of this change is to reduce the friction of Trino use by addressing the user pain when trying to solve the updateable partition scenario.

  • Supporting INSERT OVERWRITE will bring about alignment with Spark and Hive, which is the basis for user expectations.
  • Connectors can choose whether to support INSERT OVERWRITE and how to relate it to the underlying logical structure.

Alternatives:
a. Allowing session properties to be set within the query text
Requires endpoint to accept multiple queries, or creation of new language to describe these parameters
b. Connectors could support MERGE on non-ACID tables if updating on a partition boundary
This is a more complex change
Would have to be implemented in each connector
MERGE is not user friendly to non-data engineers
c. We could implement the Materialized View interface for Hive which solves a similar scenario.
Would need to align with Starburst implementation or design parallel implementation.

Conclusion

  • Insert overwrite is simple, user friendly syntax that is familiar to users of Spark and Hive
  • Insert overwrite, in my experience, has been a core scenario for many teams using Trino and therefore a good candidate for promotion beyond a session property
@findepi
Copy link
Member

findepi commented Mar 21, 2022

Current functionality: example multi-stage query:

  1. SET SESSION hive.insert_existing_partitions_behavior='OVERWRITE';
  2. INSERT INTO hive.test2.insert_test SELECT * FROM tpch.sf1.customer;

Can instead be written like this:

  1. INSERT OVERWRITE hive.test2.insert_test SELECT * FROM tpch.sf1.customer;

This was a conscious decision to go with a session property, and explicit syntax has been considered.

@georgewfisher
Copy link
Author

@findepi Dain also mentioned here that this has been extensively discussed but I couldn't find this on Slack or here in Github. Do you have a link?

In my Slack posting martin mentioned that MERGE was the ANSI solution. If MERGE is implemented for Hive ACID (#7708), it is conceivable that it could be implemented to allow MERGE to update partitions in non-ACID tables.

I didn't post this just out of instinct - I have encountered real world situations where moving beyond the session property was desirable. I'll be curious to see the discussion around this feature so I don't duplicate any thinking.

  • I will track this going forward and see what help I can offer with MERGE.

@georgewfisher
Copy link
Author

@findepi Additional question: isn't project Tardigrade that makes Trino into a Spark-like space where INSERT OVERWRITE is valuable in Spark?

@findepi
Copy link
Member

findepi commented Mar 29, 2022

i will let @arhimondr comment on Tardigrade

cc @losipiuk

@findepi
Copy link
Member

findepi commented Mar 30, 2022

also cc @brianzhan

@arhimondr
Copy link
Contributor

Tardigrade is rather orthogonal to this problem. The main goal of project Tardigrade is to provide necessary execution primitives in Trino to allow the system to tolerate faults and utilize resources more efficiently. And these capabilities will help with long running, memory intensive queries that are common in ETL space.

However I do have some thoughts around this problem.

There are basically two reasons why the OVERRIDE syntax hasn't been added:

  1. Philosophical: In Trino we are trying to stay as close to ANSI SQL as possible.
  2. Practical: Very often users actually need to override a single partition and not an entire table. Currently Trino doesn't expose a concept of partition in the SQL interface. Adding the OVERRIDE syntax without adding the concept of PARTITION only partially solves the problem. Adding a concept of PARTITION at the SQL level is problematic. Most of the connectors (except Hive) don't expose "partitions".

However, since the OVERRIDE semantics are very common in the ETL space, this problem has been approached twice:

First (canonical) approach was to use the concept of transactions to achieve similar semantics:

Spark:

INSERT OVERWRITE students PARTITION (student_id = 222222)
    SELECT name, address FROM persons WHERE name = "Dora Williams";
    
 Trino:
 
 BEGIN TRANSACTION;
 DELETE FROM students WHERE student_id = 222222;
 INSERT INTO students SELECT name, address FROM persons WHERE name = "Dora Williams";
 COMMIT;

This would've been the ideal solution to the problem. However (i don't remember the specifics though, maybe @martint does) there were some implementation issues with this solution and as an intermediate step it was decided to fall back to a session property approach insert_existing_partitions_behavior.

@georgewfisher What do you think about the transnational approach? Do you see any fundamental problems with it? If it works conceptually I wonder whether it makes sense to resurrect it (fix all the bugs) and use it instead of a session property?

@findepi
Copy link
Member

findepi commented Apr 1, 2022

(Closing per earlier discussion)

@findepi findepi closed this as completed Apr 1, 2022
@dain
Copy link
Member

dain commented May 16, 2022

@findepi Dain also mentioned #11603 (comment) that this has been extensively discussed but I couldn't find this on Slack or here in Github. Do you have a link?

This feature has been discussed since the moment we added write support to Trino (presto back then). It is a feature used extensively at FB, but this "feature" has always been a hack to make Hive happy. Anyway, these discussions predate Slack and may other tools we use today.

My hope in general is that the Hive table format dies over the next few years (and everyone switches to Iceberg). The modern formats, like Iceberg, support row level deletes and mofiications, so this old technique should just die. In the meantime, you can use the session property.

@xkrogen
Copy link
Member

xkrogen commented Oct 16, 2024

Going to reuse this closed issue just to capture/summarize discussion that was had on Slack given its ephemeral nature.

As of today, it seems we still don't have a good way to do an atomic partition-level swap on Iceberg in Trino.

Let’s say I have table t with partition column p and I want to completely replace partition p=p1 with some new data from table s. (The equivalent of Hive’s INSERT OVERWRITE operation, or the newReplacePartitions API in Iceberg.)

We can of course do this non-atomically like:

DELETE FROM t WHERE p = 'p1';
INSERT INTO table SELECT * FROM s;

But the atomicity mentioned above isn't currently supported by Trino. #15385 would resolve this, even if it was constrained to single-table transactions, but it appears that at least as of the last discussion there, maintainers have little interest in adding such support. Something we should probably revisit.

The best I can come up with to be able to do this atomically (using currently supported functionality in Trino) is a MERGE INTO where we pull all of the rows from the target partition as part of the source table, something like below. But performance will likely be quite poor given that we are forcing Trino to re-match all existing rows, and it's also a horrible UX.

MERGE INTO t
USING (
  SELECT *, 'source' AS src FROM s
  UNION ALL
  SELECT *, 'target' AS src FROM t WHERE p = 'p1'
) AS source
ON t.p = source.p AND ... -- whatever other conditions are necessary for uniqueness of the match
-- for rows which match on both sides, update it to match the new content 
WHEN MATCHED AND (source.src = 'source') THEN UPDATE SET p = source.p, -- set all of the other fields
-- for rows which existed in the target but not source, delete them
WHEN MATCHED AND (source.src = 'target') THEN DELETE
-- add new rows from the source
WHEN NOT MATCHED AND (source.src = 'source') THEN INSERT VALUES (...)

@cwsteinbach also pointed out that performance will be suboptimal for either MERGE or the transactional DELETE/INSERT because currently all Trino Iceberg writes use MoR, whereas this is an ideal case for CoW. See #17272 for more on this point.

Also just generally, I think most of us agree that this is a common operation required by users, and we should find some way to make it easier / more performant for users, hopefully without having to fully leak the implementation details like an explicit PARTITION key word would do. I think CoW + transaction support achieves this, though perhaps there are other ways as well.

@xkrogen
Copy link
Member

xkrogen commented Oct 28, 2024

Another update/summary since the Slack conversation has progressed a fair amount:

We had varying opinions (from @hashhar / @findepi) on whether transaction support in Trino in something that makes sense to continue maintaining. Lots of agreement that getting rid of the currently semi-transactional support in Hive would be a good move :) My impression is that the conversation converged to thinking that supporting Iceberg transactions (with constraints as needed, e.g. single-table) is sensible. I also called out that the Iceberg community is working on multi-table txns: apache/iceberg#10617

@dain pointed out one issue with transactions, even in Iceberg:

The bigger problem with Transactions is that they cannot be moved between coordinators today. This makes transactions for systems like the Gateway, multiple-coordinators, coordinator HA, and so on. If we do want to support transactions in the long run, I think we want movable transactions, and that would required the Iceberg transaction system to be moveable (the easiest way is to externalize transactions to the REST store or to use branch/tagging)

A WAP flow would satisfy the atomic-replacement use case without some of the pitfalls of transactions, so perhaps it's a better mechanism, though it is more complex to set up for readers. We would need proper support for branches (#12844)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

5 participants