Skip to content

Commit

Permalink
Merge branch 'main' into realCBInternal
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticmachine authored Nov 20, 2024
2 parents fdbb072 + 87b3de8 commit 44ad580
Show file tree
Hide file tree
Showing 12 changed files with 35 additions and 520 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.core.UpdateForV9;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.features.FeatureService;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
Expand All @@ -27,8 +27,7 @@
import java.io.IOException;
import java.util.List;

@UpdateForV9(owner = UpdateForV9.Owner.CORE_INFRA)
// @UpdateForV10 // this can be removed in v10. It may be called by v8 nodes to v9 nodes.
@UpdateForV10(owner = UpdateForV10.Owner.CORE_INFRA) // this can be removed in v10. It may be called by v8 nodes to v9 nodes.
public class TransportNodesFeaturesAction extends TransportNodesAction<
NodesFeaturesRequest,
NodesFeaturesResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ protected void performPhaseOnShard(final int shardIndex, final SearchShardIterat
}

private void doPerformPhaseOnShard(int shardIndex, SearchShardIterator shardIt, SearchShardTarget shard, Releasable releasable) {
executePhaseOnShard(shardIt, shard, new SearchActionListener<>(shard, shardIndex) {
var shardListener = new SearchActionListener<Result>(shard, shardIndex) {
@Override
public void innerOnResponse(Result result) {
try {
Expand All @@ -315,7 +315,15 @@ public void onFailure(Exception e) {
releasable.close();
onShardFailure(shardIndex, shard, shardIt, e);
}
});
};
final Transport.Connection connection;
try {
connection = getConnection(shard.getClusterAlias(), shard.getNodeId());
} catch (Exception e) {
shardListener.onFailure(e);
return;
}
executePhaseOnShard(shardIt, connection, shardListener);
}

private void failOnUnavailable(int shardIndex, SearchShardIterator shardIt) {
Expand All @@ -327,12 +335,12 @@ private void failOnUnavailable(int shardIndex, SearchShardIterator shardIt) {
/**
* Sends the request to the actual shard.
* @param shardIt the shards iterator
* @param shard the shard routing to send the request for
* @param connection to node that the shard is located on
* @param listener the listener to notify on response
*/
protected abstract void executePhaseOnShard(
SearchShardIterator shardIt,
SearchShardTarget shard,
Transport.Connection connection,
SearchActionListener<Result> listener
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,9 @@ final class SearchDfsQueryThenFetchAsyncAction extends AbstractSearchAsyncAction
@Override
protected void executePhaseOnShard(
final SearchShardIterator shardIt,
final SearchShardTarget shard,
final Transport.Connection connection,
final SearchActionListener<DfsSearchResult> listener
) {
final Transport.Connection connection;
try {
connection = getConnection(shard.getClusterAlias(), shard.getNodeId());
} catch (Exception e) {
listener.onFailure(e);
return;
}
getSearchTransport().sendExecuteDfs(connection, buildShardSearchRequest(shardIt, listener.requestIndex), getTask(), listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ protected static void doCheckNoMissingShards(
/**
* Releases shard targets that are not used in the docsIdsToLoad.
*/
protected void releaseIrrelevantSearchContext(SearchPhaseResult searchPhaseResult, AbstractSearchAsyncAction<?> context) {
protected static void releaseIrrelevantSearchContext(SearchPhaseResult searchPhaseResult, AbstractSearchAsyncAction<?> context) {
// we only release search context that we did not fetch from, if we are not scrolling
// or using a PIT and if it has at least one hit that didn't make it to the global topDocs
if (searchPhaseResult == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,9 @@ class SearchQueryThenFetchAsyncAction extends AbstractSearchAsyncAction<SearchPh

protected void executePhaseOnShard(
final SearchShardIterator shardIt,
final SearchShardTarget shard,
final Transport.Connection connection,
final SearchActionListener<SearchPhaseResult> listener
) {
final Transport.Connection connection;
try {
connection = getConnection(shard.getClusterAlias(), shard.getNodeId());
} catch (Exception e) {
listener.onFailure(e);
return;
}
ShardSearchRequest request = rewriteShardSearchRequest(super.buildShardSearchRequest(shardIt, listener.requestIndex));
getSearchTransport().sendExecuteQuery(connection, request, getTask(), listener);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchContextId;
Expand Down Expand Up @@ -252,16 +251,9 @@ protected String missingShardsErrorMessage(StringBuilder missingShards) {
@Override
protected void executePhaseOnShard(
SearchShardIterator shardIt,
SearchShardTarget shard,
Transport.Connection connection,
SearchActionListener<SearchPhaseResult> phaseListener
) {
final Transport.Connection connection;
try {
connection = connectionLookup.apply(shardIt.getClusterAlias(), shard.getNodeId());
} catch (Exception e) {
phaseListener.onFailure(e);
return;
}
transportService.sendChildRequest(
connection,
OPEN_SHARD_READER_CONTEXT_NAME,
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.elasticsearch.cluster.coordination.Coordinator;
import org.elasticsearch.cluster.coordination.MasterHistoryService;
import org.elasticsearch.cluster.coordination.StableMasterHealthIndicatorService;
import org.elasticsearch.cluster.features.NodeFeaturesFixupListener;
import org.elasticsearch.cluster.metadata.DataStreamGlobalRetentionSettings;
import org.elasticsearch.cluster.metadata.IndexMetadataVerifier;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -787,7 +786,6 @@ private void construct(

if (DiscoveryNode.isMasterNode(settings)) {
clusterService.addListener(new SystemIndexMappingUpdateService(systemIndices, client));
clusterService.addListener(new NodeFeaturesFixupListener(clusterService, client.admin().cluster(), threadPool));
}

SourceFieldMetrics sourceFieldMetrics = new SourceFieldMetrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ protected SearchPhase getNextPhase() {
@Override
protected void executePhaseOnShard(
final SearchShardIterator shardIt,
final SearchShardTarget shard,
final Transport.Connection shard,
final SearchActionListener<SearchPhaseResult> listener
) {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void executeNextPhase(SearchPhase currentPhase, Supplier<SearchPhase> nex
@Override
protected void executePhaseOnShard(
SearchShardIterator shardIt,
SearchShardTarget shard,
Transport.Connection shard,
SearchActionListener<SearchPhaseResult> listener
) {
onShardResult(new SearchPhaseResult() {
Expand Down
Loading

0 comments on commit 44ad580

Please sign in to comment.