-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concurrent replace should work with supervisors using concurrent locks #15995
Concurrent replace should work with supervisors using concurrent locks #15995
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments to make the flow easy to understand.
Boolean useConcurrentLocks = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; | ||
TaskLockType taskLockType = Tasks.DEFAULT_TASK_LOCK_TYPE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boolean useConcurrentLocks = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; | |
TaskLockType taskLockType = Tasks.DEFAULT_TASK_LOCK_TYPE; | |
boolean hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; |
} | ||
} | ||
|
||
if (supervisor instanceof SeekableStreamSupervisor | ||
&& !supervisorSpec.isSuspended() | ||
&& supervisorSpec.getDataSources().contains(datasource) | ||
&& TaskLockType.APPEND.equals(taskLockType)) { | ||
&& (useConcurrentLocks || TaskLockType.APPEND.equals(taskLockType))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
&& (useConcurrentLocks || TaskLockType.APPEND.equals(taskLockType))) { | |
&& hasAppendLock) { |
useConcurrentLocks = QueryContexts.getAsBoolean( | ||
Tasks.USE_CONCURRENT_LOCKS, | ||
context.get(Tasks.USE_CONCURRENT_LOCKS) | ||
); | ||
|
||
if (useConcurrentLocks == null) { | ||
useConcurrentLocks = false; | ||
taskLockType = QueryContexts.getAsEnum( | ||
Tasks.TASK_LOCK_TYPE, | ||
context.get(Tasks.TASK_LOCK_TYPE), | ||
TaskLockType.class | ||
); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
useConcurrentLocks = QueryContexts.getAsBoolean( | |
Tasks.USE_CONCURRENT_LOCKS, | |
context.get(Tasks.USE_CONCURRENT_LOCKS) | |
); | |
if (useConcurrentLocks == null) { | |
useConcurrentLocks = false; | |
taskLockType = QueryContexts.getAsEnum( | |
Tasks.TASK_LOCK_TYPE, | |
context.get(Tasks.TASK_LOCK_TYPE), | |
TaskLockType.class | |
); | |
} | |
Boolean useConcurrentLocks = QueryContexts.getAsBoolean( | |
Tasks.USE_CONCURRENT_LOCKS, | |
context.get(Tasks.USE_CONCURRENT_LOCKS) | |
); | |
if (useConcurrentLocks == null) { | |
TaskLockType taskLockType = QueryContexts.getAsEnum( | |
Tasks.TASK_LOCK_TYPE, | |
context.get(Tasks.TASK_LOCK_TYPE), | |
TaskLockType.class | |
); | |
if (taskLockType == null) { | |
hasAppendLock = Tasks.DEFAULT_USE_CONCURRENT_LOCKS; | |
} else if (taskLockType == TaskLockType.APPEND) { | |
hasAppendLock = true; | |
} else { | |
hasAppendLock = false; | |
} | |
} else { | |
hasAppendLock = useConcurrentLocks; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix, @AmatyaAvadhanula !
Thank you for the review @kfaraz |
apache#15995) * Concurrent replace should work with supervisors using concurrent locks * Ignore supervisors with useConcurrentLocks set to false * Apply feedback
apache#15995) * Concurrent replace should work with supervisors using concurrent locks * Ignore supervisors with useConcurrentLocks set to false * Apply feedback
* docs: add mermaid diagram support * fix crash when parsing data in data loader that can not be parsed (#15983) * update jetty to address CVE (#16000) * Concurrent replace should work with supervisors using concurrent locks (#15995) * Concurrent replace should work with supervisors using concurrent locks * Ignore supervisors with useConcurrentLocks set to false * Apply feedback * Add pre-check for heavy debug logs (#15706) Co-authored-by: Kashif Faraz <[email protected]> Co-authored-by: Benedict Jin <[email protected]> * Remove helm paths from CodeQL config (#16006) * docs: mention acid-compliance for metadb --------- Co-authored-by: Vadim Ogievetsky <[email protected]> Co-authored-by: Jan Werner <[email protected]> Co-authored-by: AmatyaAvadhanula <[email protected]> Co-authored-by: Sensor <[email protected]> Co-authored-by: Kashif Faraz <[email protected]> Co-authored-by: Benedict Jin <[email protected]>
A concurrent replace should allow the existing pending segments of a peon to be upgraded via the supervisor.
Currently, we check if there is such an active supervisor for streaming ingestion when the context value of
taskLockType
:APPEND
.This PR ensures that
useConcurrentLocks
:true
works as well.Please note that if
useConcurrentLocks
is set tofalse
, it would supersedetaskLockType
:APPEND
This PR has: