Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Accelerate Spark SQL queries by covering index #298

Closed
dai-chen opened this issue Apr 1, 2024 · 5 comments
Closed

[RFC] Accelerate Spark SQL queries by covering index #298

dai-chen opened this issue Apr 1, 2024 · 5 comments
Assignees
Labels

Comments

@dai-chen
Copy link
Collaborator

dai-chen commented Apr 1, 2024

Is your feature request related to a problem?

Currently, covering indexes are ingested and exposed directly in OpenSearch. While users can utilize this data for visualization, alerting, reporting, and manage its lifecycle independently, Spark SQL queries on the source table are not accelerated by the covering index data.

What solution would you like?

We propose improving the performance of Spark SQL queries by leveraging covering indexes. Specifically, this involves rewriting query with the covering index data from the same source table.

Use Cases

After visualizing the data to gain initial insights, users typically seek more in-depth analysis to uncover underlying trends, patterns, and potential root causes driving the observed phenomena. This phase often involves examining various data points, and conducting comprehensive root cause analysis. The proposed feature aims to expedite this in-depth analysis process by utilizing the pre-existing covering index data within OpenSearch, thereby eliminating the necessity for redundant scans of the source data.

Open questions regarding this proposal:

  1. Use Case: Is it acceptable for user to query the OpenSearch index directly as proposed in Alternative section below? Is it more natural because visualization is built on the index and DSL query too. Or Spark SQL query experience is required?
  2. Consistency:
    a. Currently, we only guarantee eventual consistency, and the freshness of OpenSearch index data is determined by the latency of index refresh.
    b. Users may face difficulty viewing the latest data if query rewrite is enabled always
  3. Data Management: Differently from the current mental model, the index will be exposed to both external customers and internal use. For example, if users delete the OpenSearch index data fully or partially, it may impact the correctness of the rewritten query.
  4. Security: If query rewrite happens in Spark and any access control enabled, could this query rewrite pose a security risk?
  5. Performance: It is uncertain whether there will be an improvement in latency if query rewriting occurs in Spark. Comparing scan S3 directly vs. scan OpenSearch index in Spark and transmit data to OpenSearch back, it depends on the covering index size, query result size and predicate pushdown to OpenSearch.
  6. Dev Efforts: One of the challenge is how to rewrite query using partial covering index. Proof of concept is necessary to evaluate the feasibility.

What alternatives have you considered?

Alternatively, users can:

  1. Query the index data directly via OpenSearch DSL or SQL; or
  2. Query the OpenSearch table directly via SparkSQL (PoC item 1)

Do you have any additional context?

Most of the discussion here applies to query rewrite for materialized views. For clarity, this will be discussed separately.

@dai-chen dai-chen added the enhancement New feature or request label Apr 1, 2024
@dai-chen dai-chen changed the title [RFC] Accelerate Spark SQL Queries by Covering Index [RFC] Accelerate Spark SQL queries by covering index Apr 1, 2024
@dai-chen dai-chen removed the untriaged label Apr 1, 2024
@anirudha
Copy link
Collaborator

anirudha commented Apr 1, 2024

  1. OS index will be used for re-write/ we may not depend freshness of s3 data. OS-flint doesnt do edit/modify - its an append only uasecase.

@dai-chen dai-chen added feature New feature 0.4 and removed enhancement New feature or request labels Apr 1, 2024
@dai-chen
Copy link
Collaborator Author

dai-chen commented Apr 1, 2024

Design Option: Query Rewriting in Spark using Index Data Only

One design option involves leveraging the Flint optimizer within Spark to perform query rewriting, similar to skipping index query rewriting today. In this approach, eligible queries will experience rewriting to utilize covering index data exclusively from OpenSearch.

Screenshot 2024-04-01 at 1 54 11 PM

Workflow

  1. Users send queries to the asynchronous query endpoint in OpenSearch SQL.
  2. OpenSearch SQL submits the queries to Spark.
  3. The Flint optimizer in Spark rewrites the query plan using OpenSearch index data.
  4. Spark executes the queries and scans the covering index data in OpenSearch.
  5. The Flint application stores the query results in the query result index within OpenSearch.
  6. OpenSearch SQL reads the query results and returns them to the users.

Example

# Create covering index
CREATE INDEX all_cols_idx ON http_logs;

# Spark rewrite the query and execute to pull data from OS index to result index
SELECT * FROM http_logs;

# Logical plan before and after rewriting:
#   LogicalRelation("spark_catalog.default.http_logs")
#     => LogicalRelation("opensearch.default.flint_opensearch_default_http_logs_all_all_cols_idx")

# Execution:
# Query OS: /flint_opensearch_default_http_logs_all_all_cols_idx/_search
# Load into Spark InternalRow
# Write OS: /flint_opensearch_default_http_logs_all_all_cols_idx/_bulk

# UI pulls result from query result index

Limitations

  1. User Expertise Required: Without pre-analyze API provided, users must have the knowledge to determine if their query can be accelerated beforehand. Otherwise, investing in building a covering index can incur significant costs.
  2. Covering Index are not Updatable: If users create covering index on subset of the columns, there is no way to add more columns in future.
  3. Stale Query Results:
    a. The freshness of the covering index directly impacts query accuracy, potentially leading to stale or incorrect outcomes.
    b. User manipulation of the covering index data can cause inconsistencies and partial results.
  4. Untracked Permission Changes: Changes in permissions on the source table are not reflected in the covering index data, affecting query accuracy.
  5. Restricted Predicate Pushdown: Currently, the Flint data source allows for only a limited set of predicates to be pushed down to the OpenSearch DSL query.
    3.1 Only supports basic operator
    3.2 Others, such as function, aggregation etc, are not supported
  6. Performance Degradation: Large covering indexes or queries lacking effective filtering conditions may result in degraded performance. [To be verified in PoC]

Proof of Concept

High Priorities (~3 weeks):

  1. Catalog Integration for OpenSearch Index: Integrating the OpenSearch index into the catalog system to facilitate its utilization in the query plan post-rewrite. [Efforts will focus on query rewrite requirements and avoid full integration, a task designated for [EPIC] Zero-ETL - OpenSearch Table #185]

    • Implementing a catalog and table interface for the OpenSearch index (3 days).
    • Mapping basic OpenSearch field types to SparkSQL types (2 days).
  2. Full Covering Index Rewrite: Developing query rewriting mechanisms to utilize a full covering index for optimizing query execution.

    • Adding a rewrite rule to identify queries suitable for full covering index utilization with all columns (2 days).
    • Enhancing the rule to accommodate queries matching covering index with some columns (3 days).
  3. Performance Testing: Evaluating the effectiveness of query rewriting in reducing latency or costs under specific conditions. (~1 week)

    • Test scenarios include: queries without filtering conditions, queries with filtering conditions or aggregations that cannot be pushed down, and queries with highly selective filtering conditions.

Secondary Priorities (~2 weeks):

  1. Partial Covering Index Rewrite: Facilitate query rewriting with a partial covering index, incorporating a WHERE clause for more precise data retrieval.
    a. Convert WHERE clause string in Flint metadata to expression (1 day)
    b. Perform subsumption test between expressions in query and those in partial covering index definition (1 week)

  2. User Hints for Freshness: Introduce hints that users can employ to retrieve the most recent results without the need for query rewriting.

@penghuo
Copy link
Collaborator

penghuo commented Apr 3, 2024

One question, what block user directly query on covering index? In my understanding we allow user to query on original table. for doing this, background refresh job need to keep data freshness.

@vamsimanohar
Copy link
Member

  1. How does the rewritten query would look like?
  2. Is this for increasing performance on a query on the table or exposing any opensearch capabilities via spark sql.
  3. As you rightly pointed out, we have data and there is another copy of subset of data stored in different format(Lucene format). Are we looking for use cases exploiting these different storages? is there any precedence for this type of usecase in other products?

@dai-chen
Copy link
Collaborator Author

dai-chen commented Apr 4, 2024

@penghuo @vamsi-amazon Thanks for the comments!

  1. As discussed, the proposed feature is for unified SparkSQL user experience.
  2. Updated the Example section above with query plan before and after rewriting
  3. This is mostly for performance improvement. Exposing full OS via SparkSQL is tracked in [EPIC] Zero-ETL - OpenSearch Table #185. (will update PoC item to clarify)
  4. For use case, I think our main motivation is aforementioned in item 1

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Development

No branches or pull requests

4 participants