-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle create index with batch FlintJob #2734
Handle create index with batch FlintJob #2734
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #2734 +/- ##
=========================================
Coverage 95.47% 95.47%
- Complexity 5193 5197 +4
=========================================
Files 509 509
Lines 14637 14639 +2
Branches 982 983 +1
=========================================
+ Hits 13974 13976 +2
Misses 641 641
Partials 22 22
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
Signed-off-by: Sean Kao <[email protected]>
Signed-off-by: Sean Kao <[email protected]>
Signed-off-by: Sean Kao <[email protected]>
Signed-off-by: Sean Kao <[email protected]>
Signed-off-by: Sean Kao <[email protected]>
4e05e17
to
3204307
Compare
spark/src/main/java/org/opensearch/sql/spark/dispatcher/SparkQueryDispatcher.java
Show resolved
Hide resolved
spark/src/main/java/org/opensearch/sql/spark/dispatcher/BatchQueryHandler.java
Show resolved
Hide resolved
@@ -59,6 +60,8 @@ public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) { | |||
@Override | |||
public DispatchQueryResponse submit( | |||
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) { | |||
leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this warrant any change in documentation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're moving this from parent class? The new create index job doesn't need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
behavior doesn't change for refresh query handler.
also according to https://github.com/opensearch-project/sql/blob/main/docs/user/admin/settings.rst#pluginsqueryexecutionenginesparkrefresh_joblimit
it's intended to affect refresh job only
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new create index job doesn't need this?
DDL shouldn't be affected by the concurrent limit. Without the change this following test would fail:
sql/spark/src/test/java/org/opensearch/sql/spark/asyncquery/IndexQuerySpecTest.java
Line 848 in 3ab7851
public void concurrentRefreshJobLimitNotAppliedToDDL() { |
Have we tested this on a working stack? |
yes in my test domain |
Signed-off-by: Sean Kao <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand, the current approach is make sure cancel job run won't happen on REPL job. I think we can discuss later how to make index operation more clear. For example, instead of separating EMR-S job, should we avoiding cancel logic for index not in refreshing state at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just minor comments. Those can be addressed in separate PR.
HashMap<String, String> tags = new HashMap<>(); | ||
tags.put(DATASOURCE_TAG_KEY, "my_glue"); | ||
tags.put(CLUSTER_NAME_TAG_KEY, TEST_CLUSTER_NAME); | ||
tags.put(JOB_TYPE_TAG_KEY, JobType.BATCH.getText()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: consider using ImmutableMap.of
(can be future refactoring)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this whole file use this same pattern. Can be a separate refactor PR for this for next release
new HashMap<>() { | ||
{ | ||
put(FLINT_INDEX_STORE_AWSREGION_KEY, "eu-west-1"); | ||
} | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same as above
@@ -79,6 +79,9 @@ private AsyncQueryHandler getQueryHandlerForFlintExtensionQuery( | |||
return queryHandlerFactory.getIndexDMLHandler(); | |||
} else if (isEligibleForStreamingQuery(indexQueryDetails)) { | |||
return queryHandlerFactory.getStreamingQueryHandler(); | |||
} else if (IndexQueryActionType.CREATE.equals(indexQueryDetails.getIndexQueryActionType())) { | |||
// create should be handled by batch handler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This comment might be redundant. If we keep this, should we add more context ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will add a oneliner why this should be the case
Signed-off-by: Sean Kao <[email protected]>
* update grammar file Signed-off-by: Sean Kao <[email protected]> * batch job for create manual refresh index Signed-off-by: Sean Kao <[email protected]> * dispatcher test for index dml query Signed-off-by: Sean Kao <[email protected]> * borrow lease for refresh query, not batch Signed-off-by: Sean Kao <[email protected]> * spotlessApply Signed-off-by: Sean Kao <[email protected]> * add release note Signed-off-by: Sean Kao <[email protected]> * update comment Signed-off-by: Sean Kao <[email protected]> --------- Signed-off-by: Sean Kao <[email protected]> (cherry picked from commit b959039) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
* update grammar file * batch job for create manual refresh index * dispatcher test for index dml query * borrow lease for refresh query, not batch * spotlessApply * add release note * update comment --------- (cherry picked from commit b959039) Signed-off-by: Sean Kao <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
* update grammar file Signed-off-by: Sean Kao <[email protected]> * batch job for create manual refresh index Signed-off-by: Sean Kao <[email protected]> * dispatcher test for index dml query Signed-off-by: Sean Kao <[email protected]> * borrow lease for refresh query, not batch Signed-off-by: Sean Kao <[email protected]> * spotlessApply Signed-off-by: Sean Kao <[email protected]> * add release note Signed-off-by: Sean Kao <[email protected]> * update comment Signed-off-by: Sean Kao <[email protected]> --------- Signed-off-by: Sean Kao <[email protected]> (cherry picked from commit b959039)
* update grammar file Signed-off-by: Sean Kao <[email protected]> * batch job for create manual refresh index Signed-off-by: Sean Kao <[email protected]> * dispatcher test for index dml query Signed-off-by: Sean Kao <[email protected]> * borrow lease for refresh query, not batch Signed-off-by: Sean Kao <[email protected]> * spotlessApply Signed-off-by: Sean Kao <[email protected]> * add release note Signed-off-by: Sean Kao <[email protected]> * update comment Signed-off-by: Sean Kao <[email protected]> --------- Signed-off-by: Sean Kao <[email protected]>
Description
Use batch FlintJob to handle create index query. It's already used for streaming query. This PR makes creating auto_refresh=false index use FlintJob as well.
This would resolve the bug where dropping a manual refresh index could result in cancellation of the FlintREPL job that executed the create index statement.
Alongside this change, also make the RefreshQueryHandler borrow from lease, instead of BatchQueryHandler borrowing, because the concurrent refresh job limit shouldn't apply to batch jobs in general. The behavior for refresh query would remain the same.
Making the change in this same PR because prior to this PR, there is no query that can be used to test BatchQueryHandler.
Tests
Added unit tests that cover the code path for dispatching to IndexDMLHandler, and create index queries.
End to end test
Issues Resolved
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.