Skip to content

Commit

Permalink
Clarify RemoteClusterService#groupIndices behaviour (#33899)
Browse files Browse the repository at this point in the history
When executing a cross-cluster search, we need to search against all local indices (and no remote indices) in case no indices are specified. Also, if only remote indices are specified, no local indices will be queried. We previously added empty local indices whenever they were not present in the map of the grouped indices, then we would act differently later based on the extracted remote indices. Instead, we now add the empty array for local indices only in case we need to search all local indices; the entry for local indices is not added when local indices should not be searched. This way the grouped indices reflect reality and provide a better indication of what indices will be searched.
  • Loading branch information
javanna authored Sep 24, 2018
1 parent 47ed6c7 commit e389d9e
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
request.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
final String[] concreteIndices;
if (remoteClusterIndices.isEmpty() == false && localIndices.indices().length == 0) {
// in the case we have one or more remote indices but no local we don't expand to all local indices and just do remote
// indices
if (localIndices == null) {
// in the case we have one or more remote indices but no local we don't expand to all local indices and just do remote indices
concreteIndices = Strings.EMPTY_ARRAY;
} else {
concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, localIndices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -193,7 +194,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
if (remoteClusterIndices.isEmpty()) {
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(),
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(),
(clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY);
} else {
remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(),
Expand All @@ -203,7 +204,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses,
remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses);
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices,
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,
remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,
clusters);
}, listener::onFailure));
Expand All @@ -219,7 +220,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<

static SearchResponse.Clusters buildClusters(OriginalIndices localIndices, Map<String, OriginalIndices> remoteIndices,
Map<String, ClusterSearchShardsResponse> searchShardsResponses) {
int localClusters = Math.min(localIndices.indices().length, 1);
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteIndices.size() + localClusters;
int successfulClusters = localClusters;
for (ClusterSearchShardsResponse searchShardsResponse : searchShardsResponses.values()) {
Expand Down Expand Up @@ -277,8 +278,19 @@ static BiFunction<String, String, DiscoveryNode> processRemoteShards(Map<String,
};
}

private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
Map<String, OriginalIndices> remoteClusterIndices, List<SearchShardIterator> remoteShardIterators,
private Index[] resolveLocalIndices(OriginalIndices localIndices,
IndicesOptions indicesOptions,
ClusterState clusterState,
SearchTimeProvider timeProvider) {
if (localIndices == null) {
return Index.EMPTY_ARRAY; //don't search on any local index (happens when only remote indices were specified)
}
return indexNameExpressionResolver.concreteIndices(clusterState, indicesOptions,
timeProvider.getAbsoluteStartMillis(), localIndices.indices());
}

private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,
OriginalIndices localIndices, List<SearchShardIterator> remoteShardIterators,
BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,
Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener,
SearchResponse.Clusters clusters) {
Expand All @@ -287,13 +299,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
// of just for the _search api
final Index[] indices;
if (localIndices.indices().length == 0 && remoteClusterIndices.isEmpty() == false) {
indices = Index.EMPTY_ARRAY; // don't search on _all if only remote indices were specified
} else {
indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(),
timeProvider.getAbsoluteStartMillis(), localIndices.indices());
}
final Index[] indices = resolveLocalIndices(localIndices, searchRequest.indicesOptions(), clusterState, timeProvider);
Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
searchRequest.indices());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,16 @@ public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions,
Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
if (isCrossClusterSearchEnabled()) {
final Map<String, List<String>> groupedIndices = groupClusterIndices(indices, indexExists);
for (Map.Entry<String, List<String>> entry : groupedIndices.entrySet()) {
String clusterAlias = entry.getKey();
List<String> originalIndices = entry.getValue();
originalIndicesMap.put(clusterAlias,
new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), indicesOptions));
}
if (originalIndicesMap.containsKey(LOCAL_CLUSTER_GROUP_KEY) == false) {
if (groupedIndices.isEmpty()) {
//search on _all in the local cluster if neither local indices nor remote indices were specified
originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(Strings.EMPTY_ARRAY, indicesOptions));
} else {
for (Map.Entry<String, List<String>> entry : groupedIndices.entrySet()) {
String clusterAlias = entry.getKey();
List<String> originalIndices = entry.getValue();
originalIndicesMap.put(clusterAlias,
new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), indicesOptions));
}
}
} else {
originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(indices, indicesOptions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -64,7 +63,7 @@ public void tearDown() throws Exception {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}

public void testMergeShardsIterators() throws IOException {
public void testMergeShardsIterators() {
List<ShardIterator> localShardIterators = new ArrayList<>();
{
ShardId shardId = new ShardId("local_index", "local_index_uuid", 0);
Expand Down Expand Up @@ -146,7 +145,7 @@ public void testMergeShardsIterators() throws IOException {
}
}

public void testProcessRemoteShards() throws IOException {
public void testProcessRemoteShards() {
try (TransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
null)) {
RemoteClusterService service = transportService.getRemoteClusterService();
Expand Down Expand Up @@ -241,12 +240,12 @@ public void testProcessRemoteShards() throws IOException {
}

public void testBuildClusters() {
OriginalIndices localIndices = randomOriginalIndices();
OriginalIndices localIndices = randomBoolean() ? null : randomOriginalIndices();
Map<String, OriginalIndices> remoteIndices = new HashMap<>();
Map<String, ClusterSearchShardsResponse> searchShardsResponses = new HashMap<>();
int numRemoteClusters = randomIntBetween(0, 10);
boolean onlySuccessful = randomBoolean();
int localClusters = localIndices.indices().length == 0 ? 0 : 1;
int localClusters = localIndices == null ? 0 : 1;
int total = numRemoteClusters + localClusters;
int successful = localClusters;
int skipped = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.AbstractScopedSettings;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -151,7 +152,6 @@ public void testBuildRemoteClustersDynamicConfig() throws Exception {
assertEquals(boom.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
}


public void testGroupClusterIndices() throws IOException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
Expand Down Expand Up @@ -179,10 +179,9 @@ public void testGroupClusterIndices() throws IOException {
Map<String, List<String>> perClusterIndices = service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar",
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo", "cluster*:baz", "*:boo", "no*match:boo"},
i -> false);
String[] localIndices = perClusterIndices.computeIfAbsent(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
k -> Collections.emptyList()).toArray(new String[0]);
assertNotNull(perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
assertArrayEquals(new String[]{"foo:bar", "foo", "no*match:boo"}, localIndices);
List<String> localIndices = perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
assertNotNull(localIndices);
assertEquals(Arrays.asList("foo:bar", "foo", "no*match:boo"), localIndices);
assertEquals(2, perClusterIndices.size());
assertEquals(Arrays.asList("bar", "test", "baz", "boo"), perClusterIndices.get("cluster_1"));
assertEquals(Arrays.asList("foo:bar", "foo*", "baz", "boo"), perClusterIndices.get("cluster_2"));
Expand All @@ -198,6 +197,68 @@ public void testGroupClusterIndices() throws IOException {
}
}

public void testGroupIndices() throws IOException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(otherSeedTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());

try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
null)) {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString());
builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
assertFalse(service.isRemoteClusterRegistered("foo"));
{
Map<String, OriginalIndices> perClusterIndices = service.groupIndices(IndicesOptions.LENIENT_EXPAND_OPEN,
new String[]{"foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo",
"cluster*:baz", "*:boo", "no*match:boo"},
i -> false);
assertEquals(3, perClusterIndices.size());
assertArrayEquals(new String[]{"foo:bar", "foo", "no*match:boo"},
perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).indices());
assertArrayEquals(new String[]{"bar", "test", "baz", "boo"}, perClusterIndices.get("cluster_1").indices());
assertArrayEquals(new String[]{"foo:bar", "foo*", "baz", "boo"}, perClusterIndices.get("cluster_2").indices());
}
{
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () ->
service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar",
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, "cluster_1:bar"::equals));
assertEquals("Can not filter indices; index cluster_1:bar exists but there is also a remote cluster named:" +
" cluster_1", iae.getMessage());
}
{
Map<String, OriginalIndices> perClusterIndices = service.groupIndices(IndicesOptions.LENIENT_EXPAND_OPEN,
new String[]{"cluster_1:bar", "cluster_2:foo*"},
i -> false);
assertEquals(2, perClusterIndices.size());
assertArrayEquals(new String[]{"bar"}, perClusterIndices.get("cluster_1").indices());
assertArrayEquals(new String[]{"foo*"}, perClusterIndices.get("cluster_2").indices());
}
{
Map<String, OriginalIndices> perClusterIndices = service.groupIndices(IndicesOptions.LENIENT_EXPAND_OPEN,
Strings.EMPTY_ARRAY,
i -> false);
assertEquals(1, perClusterIndices.size());
assertArrayEquals(Strings.EMPTY_ARRAY, perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).indices());
}
}
}
}
}

public void testIncrementallyAddClusters() throws IOException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
Expand Down

0 comments on commit e389d9e

Please sign in to comment.