Skip to content

Commit

Permalink
Ensure validation of the reader context is executed first (#61831)
Browse files Browse the repository at this point in the history
This change makes sure that reader context is validated (`SearchOperationListener#validateReaderContext)
before any other operation and that it is correctly recycled or removed at the end of the operation.
This commit also fixes a race condition bug that would allocate the security reader for scrolls more than once.

Relates #61446

Co-authored-by: Nhat Nguyen <[email protected]>
  • Loading branch information
jimczi and dnhatn committed Sep 10, 2020
1 parent 44bd4a6 commit 4d528e9
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,24 +547,27 @@ public final void onPhaseFailure(SearchPhase phase, String msg, Throwable cause)
}

/**
* This method should be called if a search phase failed to ensure all relevant search contexts and resources are released.
* this method will also notify the listener and sends back a failure to the user.
* This method should be called if a search phase failed to ensure all relevant reader contexts are released.
* This method will also notify the listener and sends back a failure to the user.
*
* @param exception the exception explaining or causing the phase failure
*/
private void raisePhaseFailure(SearchPhaseExecutionException exception) {
results.getSuccessfulResults().forEach((entry) -> {
if (entry.getContextId() != null) {
try {
SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices());
} catch (Exception inner) {
inner.addSuppressed(exception);
logger.trace("failed to release context", inner);
// we don't release persistent readers (point in time).
if (request.pointInTimeBuilder() == null) {
results.getSuccessfulResults().forEach((entry) -> {
if (entry.getContextId() != null) {
try {
SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
sendReleaseSearchContext(entry.getContextId(), connection, searchShardTarget.getOriginalIndices());
} catch (Exception inner) {
inner.addSuppressed(exception);
logger.trace("failed to release context", inner);
}
}
}
});
});
}
listener.onFailure(exception);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,11 +98,13 @@ public void onFailure(Exception exception) {
progressListener.notifyQueryFailure(shardIndex, searchShardTarget, exception);
counter.onFailure(shardIndex, searchShardTarget, exception);
} finally {
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
context.sendReleaseSearchContext(
querySearchRequest.contextId(), connection, searchShardTarget.getOriginalIndices());
if (context.getRequest().pointInTimeBuilder() == null) {
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
context.sendReleaseSearchContext(
querySearchRequest.contextId(), connection, searchShardTarget.getOriginalIndices());
}
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ public void onFailure(Exception e) {
* Releases shard targets that are not used in the docsIdsToLoad.
*/
private void releaseIrrelevantSearchContext(QuerySearchResult queryResult) {
// we only release search context that we did not fetch from if we are not scrolling
// and if it has at lease one hit that didn't make it to the global topDocs
if (context.getRequest().scroll() == null &&
context.getRequest().pointInTimeBuilder() == null &&
queryResult.hasSearchContext()) {
// we only release search context that we did not fetch from, if we are not scrolling
// or using a PIT and if it has at least one hit that didn't make it to the global topDocs
if (queryResult.hasSearchContext()
&& context.getRequest().scroll() == null
&& context.getRequest().pointInTimeBuilder() == null) {
try {
SearchShardTarget searchShardTarget = queryResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ default void onFreeScrollContext(ReaderContext readerContext) {}
* @param readerContext The reader context used by this request.
* @param transportRequest the request that is going to use the search context
*/
default void validateSearchContext(ReaderContext readerContext, TransportRequest transportRequest) {}
default void validateReaderContext(ReaderContext readerContext, TransportRequest transportRequest) {}

/**
* A Composite listener that multiplexes calls to each of the listeners methods.
Expand Down Expand Up @@ -238,11 +238,11 @@ public void onFreeScrollContext(ReaderContext readerContext) {
}

@Override
public void validateSearchContext(ReaderContext readerContext, TransportRequest request) {
public void validateReaderContext(ReaderContext readerContext, TransportRequest request) {
Exception exception = null;
for (SearchOperationListener listener : listeners) {
try {
listener.validateSearchContext(readerContext, request);
listener.validateReaderContext(readerContext, request);
} catch (Exception e) {
exception = ExceptionsHelper.useOrSuppress(exception, e);
}
Expand Down
Loading

0 comments on commit 4d528e9

Please sign in to comment.