-
Notifications
You must be signed in to change notification settings - Fork 141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flint query scheduler part 2 #2961
Flint query scheduler part 2 #2961
Conversation
Signed-off-by: Louis Chu <[email protected]>
Signed-off-by: Louis Chu <[email protected]>
Signed-off-by: Louis Chu <[email protected]>
public boolean isExternalScheduler() { | ||
// Default is false, which means using internal scheduler to refresh the index. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Internal/External sounds ambiguous to me. Can we rephrase or add documentation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does internal means within spark? I think it is difficult to find that documentation from this option defined in SQL plugin. And I would imagine internal
is internal to OpenSearch...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah internal means within spark
and external could be anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't want to create a blocker now, but @ykmr1224 raised a good question.
We should have the name of this flag from the perspective of spark. Probably use_spark_scheduler = True | False
would make more sense because we actually don't care about what an external scheduler is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from spark perspective, we want to have extensibility among external Schedulers in case they have difference behavior
async-query-core/src/test/java/org/opensearch/sql/spark/asyncquery/AsyncQueryCoreIntegTest.java
Show resolved
Hide resolved
...uery-core/src/test/java/org/opensearch/sql/spark/flint/operation/FlintIndexOpVacuumTest.java
Show resolved
Hide resolved
async-query/src/main/java/org/opensearch/sql/spark/scheduler/parser/IntervalScheduleParser.java
Show resolved
Hide resolved
async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJob.java
Outdated
Show resolved
Hide resolved
async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJob.java
Outdated
Show resolved
Hide resolved
async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJob.java
Outdated
Show resolved
Hide resolved
...ry/src/main/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequest.java
Outdated
Show resolved
Hide resolved
"^(\\d+)\\s*(years?|months?|weeks?|days?|hours?|minutes?|minute|mins?|seconds?|secs?|milliseconds?|millis?|micros?|nanoseconds?|nanos?)$", | ||
Pattern.CASE_INSENSITIVE); | ||
|
||
public static IntervalSchedule parse(String intervalStr, Instant startTime) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a right decision to implement it by ourselves? Is there common format for time duration and library for that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a right decision to implement it by ourselves? Is there common format for time duration and library for that?
The problem here is to converting Spark CalendarInterval to OpenSearch job scheduler interval.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we use Spark CalendarInterval?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spark CalendarInterval is the interface for spark scheduler, we will need this when convert stream job to external scheduler
...rc/test/java/org/opensearch/sql/spark/scheduler/model/ScheduledAsyncQueryJobRequestTest.java
Outdated
Show resolved
Hide resolved
Signed-off-by: Louis Chu <[email protected]>
Signed-off-by: Louis Chu <[email protected]>
async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java
Show resolved
Hide resolved
async-query-core/src/main/java/org/opensearch/sql/spark/scheduler/AsyncQueryScheduler.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Are we tracking the optimization of using spark's scheduler in case of lower refresh intervals as a separate issue?
Signed-off-by: Louis Chu <[email protected]>
c85ed20
to
1e489b5
Compare
|
LOGGER.error(throwable); | ||
} | ||
}; | ||
threadPool.generic().submit(runnable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: Is it considered best practice to submit tasks to a generic thread pool across all job scheduler clients?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed in f9fe064
Signed-off-by: Louis Chu <[email protected]>
c1f0054
to
f9fe064
Compare
* Flint query scheduler part 2 Signed-off-by: Louis Chu <[email protected]> * spotless apply Signed-off-by: Louis Chu <[email protected]> * Add UT Signed-off-by: Louis Chu <[email protected]> * Resolve comments Signed-off-by: Louis Chu <[email protected]> * Add more UTs Signed-off-by: Louis Chu <[email protected]> * Resolve comments Signed-off-by: Louis Chu <[email protected]> * Use SQL thread pool Signed-off-by: Louis Chu <[email protected]> --------- Signed-off-by: Louis Chu <[email protected]> (cherry picked from commit 729bb13) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
* Flint query scheduler part 2 Signed-off-by: Louis Chu <[email protected]> * spotless apply Signed-off-by: Louis Chu <[email protected]> * Add UT Signed-off-by: Louis Chu <[email protected]> * Resolve comments Signed-off-by: Louis Chu <[email protected]> * Add more UTs Signed-off-by: Louis Chu <[email protected]> * Resolve comments Signed-off-by: Louis Chu <[email protected]> * Use SQL thread pool Signed-off-by: Louis Chu <[email protected]> --------- Signed-off-by: Louis Chu <[email protected]> (cherry picked from commit 729bb13) Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
* Flint query scheduler part 2 * spotless apply * Add UT * Resolve comments * Add more UTs * Resolve comments * Use SQL thread pool --------- (cherry picked from commit 729bb13) Signed-off-by: Louis Chu <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
* Flint query scheduler part 2 * spotless apply * Add UT * Resolve comments * Add more UTs * Resolve comments * Use SQL thread pool --------- (cherry picked from commit 729bb13) Signed-off-by: Louis Chu <[email protected]> Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Description
AsyncQueryScheduler
interface, withAsyncQuerySchedulerRequest
modelAsyncQueryScheduler
toFlintIndexOpFactory
to supportDROP
,VACUUM
, andALTER ... WITH (auto_refresh = false)
AsyncQueryScheduler
flintIndexOpFactory
so it integrates seamlesslyLocal sanity test
Drop should unschedule
Vacuum should remove schedule
Related Issues
#2833
Follow ups
RefreshQueryHandler
doesn't check whether an index is in refreshing state before sending EMR job, with scheduler this can be optimizedCheck List
--signoff
.By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.