-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Transform] improve performance by using point in time API for search #74984
Conversation
Pinging @elastic/ml-core (Team:ML) |
691872c
to
4737c80
Compare
retest this please |
4737c80
to
d501166
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
...sform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java
Outdated
Show resolved
Hide resolved
...sform/src/main/java/org/elasticsearch/xpack/transform/transforms/ClientTransformIndexer.java
Outdated
Show resolved
Hide resolved
if (getNextCheckpoint().getCheckpoint() != pitCheckpoint) { | ||
closePointInTime(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to me that we should not move this execution thread forward until the pit
is closed.
It is conceivable right (though unlikely) that this closePointInTime()
is executing, but doSearch
is being handled and consequently, we close the wrong PIT and leave one left over.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pit is "copied" (not literally, but the reference) and set to null
in the sync part of closePointInTime()
, see line 470++. So you are right that we might open a new pit while still closing the other, however that's allowed and I don't see a race condition that could lead to mixing up the two.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hendrikmuhs 100%, I misread the method. Setting a local variable synchronously should avoid that problem :).
ActionListener.wrap(response -> { logger.trace("[{}] closed pit search context [{}]", getJobId(), oldPit); }, e -> { | ||
// note: closing the pit should never throw, even if the pit is invalid | ||
logger.error(new ParameterizedMessage("[{}] Failed to close point in time reader", getJobId()), e); | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the logger.trace
should have a message supplier like () -> new ParameterizedMessage
to prevent strings from being created when trace
is disabled.
Not a huge deal as this is not a "hot path"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this applies. This is only a problem if one or more arguments needs to be constructed, e.g. if getJobId()
would build the id and therefore execute something. This is not the case.
The message string itself gets only constructed after the check whether trace is enabled or not.
I wish we have static code analysis for this this, it's such a common problem.
pit = new PointInTimeBuilder(response.getPointInTimeId()).setKeepAlive(PIT_KEEP_ALIVE); | ||
searchRequest.source().pointInTimeBuilder(pit); | ||
pitCheckpoint = getNextCheckpoint().getCheckpoint(); | ||
logger.trace("[{}] using pit search context with id [{}]", getJobId(), pit.getEncodedId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar comment () -> new ParameterizedMessage
seems better to me for trace
private static final Logger logger = LogManager.getLogger(ClientTransformIndexer.class); | ||
|
||
private final Client client; | ||
private final AtomicBoolean oldStatsCleanedUp = new AtomicBoolean(false); | ||
|
||
private final AtomicReference<SeqNoPrimaryTermAndIndex> seqNoPrimaryTermAndIndex; | ||
private PointInTimeBuilder pit; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does pit
and disablePit
need to be volatile
? They are accessed from separate threads in different execution paths.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pit is always accessed by the indexer thread, even the onStop
call originates from the indexer, not from the _stop
transport if that's what you mean
But I am unsure, the async behavior of the indexer might indeed be problematic in this case. I will check other variables, too.
@elasticmachine update branch |
if (getNextCheckpoint().getCheckpoint() != pitCheckpoint) { | ||
closePointInTime(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hendrikmuhs 100%, I misread the method. Setting a local variable synchronously should avoid that problem :).
Test resultsUsing This chart compares a baseline (indexing without doing any searches/transforms), transform without pit ( The usage of It depends: This is not a representative benchmark. The benefit of using In summary you might not see a resource reduction in the same order of magnitude with your data, however pit should reduce overhead for all use cases where the source index isn't static (continuous transform). |
…earch (#75333) Use point in time API for every checkpoint in transform. Using point in time reduces pressure on the source indexes, e.g. less refreshes. In case, pit isn't supported (e.g. when searching remote clusters) it falls back to ordinary search requests as before. closes #73481 backport #74984
…elastic#74984) Use point in time API for every checkpoint in transform. Using point in time reduces pressure on the source indexes, e.g. less refreshes. In case, pit isn't supported (e.g. when searching remote clusters) it falls back to ordinary search requests as before. closes elastic#73481
Fix a unreleased regression introduced in #74984. In case a pit search context disappeared the listener was called twice and the transform fails.
…c#75615) Fix a unreleased regression introduced in elastic#74984. In case a pit search context disappeared the listener was called twice and the transform fails.
…c#75615) Fix a unreleased regression introduced in elastic#74984. In case a pit search context disappeared the listener was called twice and the transform fails.
Use point in time API for every checkpoint in transform. Using point in time reduces pressure on the source indexes, e.g. less refreshes. In case, pit isn't supported (e.g. when searching remote clusters) it falls back to ordinary search requests as before.
closes #73481