Skip to content

Commit

Permalink
Remove needless sending of OriginalIndices in SearchFreeContextRequest (
Browse files Browse the repository at this point in the history
elastic#117245)

We don't need to use this request, the handler for freeing of scroll requests literally goes
to the same transport handler and doesn't come with the list of indices.
The original security need for keeping the list of indices around is long gone.
  • Loading branch information
original-brownbear authored Dec 13, 2024
1 parent 908cf9a commit b4610c8
Show file tree
Hide file tree
Showing 10 changed files with 44 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -556,11 +556,7 @@ public void testUpdateSettings() {
}

public void testSearchQueryThenFetch() throws Exception {
interceptTransportActions(
SearchTransportService.QUERY_ACTION_NAME,
SearchTransportService.FETCH_ID_ACTION_NAME,
SearchTransportService.FREE_CONTEXT_ACTION_NAME
);
interceptTransportActions(SearchTransportService.QUERY_ACTION_NAME, SearchTransportService.FETCH_ID_ACTION_NAME);

String[] randomIndicesOrAliases = randomIndicesOrAliases();
for (int i = 0; i < randomIndicesOrAliases.length; i++) {
Expand All @@ -580,16 +576,13 @@ public void testSearchQueryThenFetch() throws Exception {
SearchTransportService.QUERY_ACTION_NAME,
SearchTransportService.FETCH_ID_ACTION_NAME
);
// free context messages are not necessarily sent, but if they are, check their indices
assertIndicesSubsetOptionalRequests(Arrays.asList(searchRequest.indices()), SearchTransportService.FREE_CONTEXT_ACTION_NAME);
}

public void testSearchDfsQueryThenFetch() throws Exception {
interceptTransportActions(
SearchTransportService.DFS_ACTION_NAME,
SearchTransportService.QUERY_ID_ACTION_NAME,
SearchTransportService.FETCH_ID_ACTION_NAME,
SearchTransportService.FREE_CONTEXT_ACTION_NAME
SearchTransportService.FETCH_ID_ACTION_NAME
);

String[] randomIndicesOrAliases = randomIndicesOrAliases();
Expand All @@ -611,8 +604,6 @@ public void testSearchDfsQueryThenFetch() throws Exception {
SearchTransportService.QUERY_ID_ACTION_NAME,
SearchTransportService.FETCH_ID_ACTION_NAME
);
// free context messages are not necessarily sent, but if they are, check their indices
assertIndicesSubsetOptionalRequests(Arrays.asList(searchRequest.indices()), SearchTransportService.FREE_CONTEXT_ACTION_NAME);
}

private static void assertSameIndices(IndicesRequest originalRequest, String... actions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
try {
SearchShardTarget searchShardTarget = entry.getSearchShardTarget();
Transport.Connection connection = getConnection(searchShardTarget.getClusterAlias(), searchShardTarget.getNodeId());
sendReleaseSearchContext(entry.getContextId(), connection, getOriginalIndices(entry.getShardIndex()));
sendReleaseSearchContext(entry.getContextId(), connection);
} catch (Exception inner) {
inner.addSuppressed(exception);
logger.trace("failed to release context", inner);
Expand All @@ -727,10 +727,10 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
* @see org.elasticsearch.search.fetch.FetchSearchResult#getContextId()
*
*/
void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection, OriginalIndices originalIndices) {
void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection) {
assert isPartOfPointInTime(contextId) == false : "Must not release point in time context [" + contextId + "]";
if (connection != null) {
searchTransportService.sendFreeContext(connection, contextId, originalIndices);
searchTransportService.sendFreeContext(connection, contextId, ActionListener.noop());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,7 @@ public void onFailure(Exception exception) {
// the query might not have been executed at all (for example because thread pool rejected
// execution) and the search context that was created in dfs phase might not be released.
// release it again to be in the safe side
context.sendReleaseSearchContext(
querySearchRequest.contextId(),
connection,
context.getOriginalIndices(shardIndex)
);
context.sendReleaseSearchContext(querySearchRequest.contextId(), connection);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,7 @@ protected static void releaseIrrelevantSearchContext(SearchPhaseResult searchPha
context.getLogger().trace("trying to release search context [{}]", phaseResult.getContextId());
SearchShardTarget shardTarget = phaseResult.getSearchShardTarget();
Transport.Connection connection = context.getConnection(shardTarget.getClusterAlias(), shardTarget.getNodeId());
context.sendReleaseSearchContext(
phaseResult.getContextId(),
connection,
context.getOriginalIndices(phaseResult.getShardIndex())
);
context.sendReleaseSearchContext(phaseResult.getContextId(), connection);
} catch (Exception e) {
context.getLogger().trace("failed to release context", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,10 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.get.TransportGetTaskAction;
import org.elasticsearch.action.support.ChannelActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.OriginSettingClient;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -124,24 +122,6 @@ public SearchTransportService(
this.responseWrapper = responseWrapper;
}

private static final ActionListenerResponseHandler<SearchFreeContextResponse> SEND_FREE_CONTEXT_LISTENER =
new ActionListenerResponseHandler<>(
ActionListener.noop(),
SearchFreeContextResponse::readFrom,
TransportResponseHandler.TRANSPORT_WORKER
);

public void sendFreeContext(Transport.Connection connection, final ShardSearchContextId contextId, OriginalIndices originalIndices) {
transportService.sendRequest(
connection,
FREE_CONTEXT_ACTION_NAME,
new SearchFreeContextRequest(originalIndices, contextId),
TransportRequestOptions.EMPTY,
// no need to respond if it was freed or not
SEND_FREE_CONTEXT_LISTENER
);
}

public void sendFreeContext(
Transport.Connection connection,
ShardSearchContextId contextId,
Expand Down Expand Up @@ -370,43 +350,6 @@ private static class ClearScrollContextsRequest extends TransportRequest {
}
}

static class SearchFreeContextRequest extends ScrollFreeContextRequest implements IndicesRequest {
private final OriginalIndices originalIndices;

SearchFreeContextRequest(OriginalIndices originalIndices, ShardSearchContextId id) {
super(id);
this.originalIndices = originalIndices;
}

SearchFreeContextRequest(StreamInput in) throws IOException {
super(in);
originalIndices = OriginalIndices.readOriginalIndices(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
OriginalIndices.writeOriginalIndices(originalIndices, out);
}

@Override
public String[] indices() {
if (originalIndices == null) {
return null;
}
return originalIndices.indices();
}

@Override
public IndicesOptions indicesOptions() {
if (originalIndices == null) {
return null;
}
return originalIndices.indicesOptions();
}

}

public static class SearchFreeContextResponse extends TransportResponse {

private static final SearchFreeContextResponse FREED = new SearchFreeContextResponse(true);
Expand Down Expand Up @@ -456,12 +399,13 @@ public static void registerRequestHandler(TransportService transportService, Sea
SearchFreeContextResponse::readFrom
);

transportService.registerRequestHandler(
FREE_CONTEXT_ACTION_NAME,
freeContextExecutor,
SearchFreeContextRequest::new,
freeContextHandler
);
// TODO: remove this handler once the lowest compatible version stops using it
transportService.registerRequestHandler(FREE_CONTEXT_ACTION_NAME, freeContextExecutor, in -> {
var res = new ScrollFreeContextRequest(in);
// this handler exists for BwC purposes only, we don't need the original indices to free the context
OriginalIndices.readOriginalIndices(in);
return res;
}, freeContextHandler);
TransportActionProxy.registerProxyAction(transportService, FREE_CONTEXT_ACTION_NAME, false, SearchFreeContextResponse::readFrom);

transportService.registerRequestHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@ long buildTookInMillis() {
}

@Override
public void sendReleaseSearchContext(
ShardSearchContextId contextId,
Transport.Connection connection,
OriginalIndices originalIndices
) {
public void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection) {
releasedContexts.add(contextId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ protected void executePhaseOnShard(
}

@Override
public void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection, OriginalIndices originalIndices) {
public void sendReleaseSearchContext(ShardSearchContextId contextId, Transport.Connection connection) {
releasedSearchContexts.add(contextId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,11 @@ public void testFanOutAndCollect() throws InterruptedException {
AtomicInteger numFreedContext = new AtomicInteger();
SearchTransportService transportService = new SearchTransportService(null, null, null) {
@Override
public void sendFreeContext(Transport.Connection connection, ShardSearchContextId contextId, OriginalIndices originalIndices) {
public void sendFreeContext(
Transport.Connection connection,
ShardSearchContextId contextId,
ActionListener<SearchFreeContextResponse> listener
) {
numFreedContext.incrementAndGet();
assertTrue(nodeToContextMap.containsKey(connection.getNode()));
assertTrue(nodeToContextMap.get(connection.getNode()).remove(contextId));
Expand Down Expand Up @@ -363,7 +367,7 @@ public void run() {
for (int i = 0; i < results.getNumShards(); i++) {
TestSearchPhaseResult result = results.getAtomicArray().get(i);
assertEquals(result.node.getId(), result.getSearchShardTarget().getNodeId());
sendReleaseSearchContext(result.getContextId(), new MockConnection(result.node), OriginalIndices.NONE);
sendReleaseSearchContext(result.getContextId(), new MockConnection(result.node));
}
responseListener.onResponse(testResponse);
if (latchTriggered.compareAndSet(false, true) == false) {
Expand Down Expand Up @@ -421,8 +425,13 @@ public void testFanOutAndFail() throws InterruptedException {
);
AtomicInteger numFreedContext = new AtomicInteger();
SearchTransportService transportService = new SearchTransportService(null, null, null) {

@Override
public void sendFreeContext(Transport.Connection connection, ShardSearchContextId contextId, OriginalIndices originalIndices) {
public void sendFreeContext(
Transport.Connection connection,
ShardSearchContextId contextId,
ActionListener<SearchFreeContextResponse> listener
) {
assertNotNull(contextId);
numFreedContext.incrementAndGet();
assertTrue(nodeToContextMap.containsKey(connection.getNode()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.action.search.SearchTransportService.FREE_CONTEXT_ACTION_NAME;
import static org.elasticsearch.action.search.SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME;
import static org.elasticsearch.cluster.coordination.ClusterBootstrapService.INITIAL_MASTER_NODES_SETTING;
import static org.elasticsearch.discovery.SettingsBasedSeedHostsProvider.DISCOVERY_SEED_HOSTS_SETTING;
import static org.elasticsearch.test.NodeRoles.dataNode;
Expand Down Expand Up @@ -482,7 +482,7 @@ protected void ensureNoInitializingShards() {
*/
protected void ensureAllFreeContextActionsAreConsumed() throws Exception {
logger.info("--> waiting for all free_context tasks to complete within a reasonable time");
safeGet(clusterAdmin().prepareListTasks().setActions(FREE_CONTEXT_ACTION_NAME + "*").setWaitForCompletion(true).execute());
safeGet(clusterAdmin().prepareListTasks().setActions(FREE_CONTEXT_SCROLL_ACTION_NAME + "*").setWaitForCompletion(true).execute());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,19 @@ public class RBACEngine implements AuthorizationEngine {
private static final String DELETE_SUB_REQUEST_REPLICA = TransportDeleteAction.NAME + "[r]";

private static final Logger logger = LogManager.getLogger(RBACEngine.class);

private static final Set<String> SCROLL_RELATED_ACTIONS = Set.of(
TransportSearchScrollAction.TYPE.name(),
SearchTransportService.FETCH_ID_SCROLL_ACTION_NAME,
SearchTransportService.QUERY_FETCH_SCROLL_ACTION_NAME,
SearchTransportService.QUERY_SCROLL_ACTION_NAME,
SearchTransportService.FREE_CONTEXT_ACTION_NAME,
SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME,
TransportClearScrollAction.NAME,
"indices:data/read/sql/close_cursor",
SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME
);

private final Settings settings;
private final CompositeRolesStore rolesStore;
private final FieldPermissionsCache fieldPermissionsCache;
Expand Down Expand Up @@ -320,7 +333,7 @@ public void authorizeIndexAction(
// need to validate that the action is allowed and then move on
listener.onResponse(role.checkIndicesAction(action) ? IndexAuthorizationResult.EMPTY : IndexAuthorizationResult.DENIED);
} else if (request instanceof IndicesRequest == false) {
if (isScrollRelatedAction(action)) {
if (SCROLL_RELATED_ACTIONS.contains(action)) {
// scroll is special
// some APIs are indices requests that are not actually associated with indices. For example,
// search scroll request, is categorized under the indices context, but doesn't hold indices names
Expand Down Expand Up @@ -1000,17 +1013,6 @@ public int hashCode() {
}
}

private static boolean isScrollRelatedAction(String action) {
return action.equals(TransportSearchScrollAction.TYPE.name())
|| action.equals(SearchTransportService.FETCH_ID_SCROLL_ACTION_NAME)
|| action.equals(SearchTransportService.QUERY_FETCH_SCROLL_ACTION_NAME)
|| action.equals(SearchTransportService.QUERY_SCROLL_ACTION_NAME)
|| action.equals(SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME)
|| action.equals(TransportClearScrollAction.NAME)
|| action.equals("indices:data/read/sql/close_cursor")
|| action.equals(SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME);
}

private static boolean isAsyncRelatedAction(String action) {
return action.equals(SubmitAsyncSearchAction.NAME)
|| action.equals(GetAsyncSearchAction.NAME)
Expand Down

0 comments on commit b4610c8

Please sign in to comment.