Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarify RemoteClusterService#groupIndices behaviour #33899

Merged
merged 2 commits into from
Sep 24, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ protected void doExecute(Task task, FieldCapabilitiesRequest request, final Acti
request.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
final String[] concreteIndices;
if (remoteClusterIndices.isEmpty() == false && localIndices.indices().length == 0) {
// in the case we have one or more remote indices but no local we don't expand to all local indices and just do remote
// indices
if (localIndices == null) {
// in the case we have one or more remote indices but no local we don't expand to all local indices and just do remote indices
concreteIndices = Strings.EMPTY_ARRAY;
} else {
concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, localIndices);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
Expand Down Expand Up @@ -193,7 +194,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
searchRequest.indices(), idx -> indexNameExpressionResolver.hasIndexOrAlias(idx, clusterState));
OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
if (remoteClusterIndices.isEmpty()) {
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, remoteClusterIndices, Collections.emptyList(),
executeSearch((SearchTask)task, timeProvider, searchRequest, localIndices, Collections.emptyList(),
(clusterName, nodeId) -> null, clusterState, Collections.emptyMap(), listener, SearchResponse.Clusters.EMPTY);
} else {
remoteClusterService.collectSearchShards(searchRequest.indicesOptions(), searchRequest.preference(),
Expand All @@ -203,7 +204,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<
BiFunction<String, String, DiscoveryNode> clusterNodeLookup = processRemoteShards(searchShardsResponses,
remoteClusterIndices, remoteShardIterators, remoteAliasFilters);
SearchResponse.Clusters clusters = buildClusters(localIndices, remoteClusterIndices, searchShardsResponses);
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices, remoteClusterIndices,
executeSearch((SearchTask) task, timeProvider, searchRequest, localIndices,
remoteShardIterators, clusterNodeLookup, clusterState, remoteAliasFilters, listener,
clusters);
}, listener::onFailure));
Expand All @@ -219,7 +220,7 @@ protected void doExecute(Task task, SearchRequest searchRequest, ActionListener<

static SearchResponse.Clusters buildClusters(OriginalIndices localIndices, Map<String, OriginalIndices> remoteIndices,
Map<String, ClusterSearchShardsResponse> searchShardsResponses) {
int localClusters = Math.min(localIndices.indices().length, 1);
int localClusters = localIndices == null ? 0 : 1;
int totalClusters = remoteIndices.size() + localClusters;
int successfulClusters = localClusters;
for (ClusterSearchShardsResponse searchShardsResponse : searchShardsResponses.values()) {
Expand Down Expand Up @@ -277,8 +278,19 @@ static BiFunction<String, String, DiscoveryNode> processRemoteShards(Map<String,
};
}

private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest, OriginalIndices localIndices,
Map<String, OriginalIndices> remoteClusterIndices, List<SearchShardIterator> remoteShardIterators,
private Index[] resolveLocalIndices(OriginalIndices localIndices,
IndicesOptions indicesOptions,
ClusterState clusterState,
SearchTimeProvider timeProvider) {
if (localIndices == null) {
return Index.EMPTY_ARRAY; //don't search on any local index (happens when only remote indices were specified)
}
return indexNameExpressionResolver.concreteIndices(clusterState, indicesOptions,
timeProvider.getAbsoluteStartMillis(), localIndices.indices());
}

private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, SearchRequest searchRequest,
OriginalIndices localIndices, List<SearchShardIterator> remoteShardIterators,
BiFunction<String, String, DiscoveryNode> remoteConnections, ClusterState clusterState,
Map<String, AliasFilter> remoteAliasMap, ActionListener<SearchResponse> listener,
SearchResponse.Clusters clusters) {
Expand All @@ -287,13 +299,7 @@ private void executeSearch(SearchTask task, SearchTimeProvider timeProvider, Sea
// TODO: I think startTime() should become part of ActionRequest and that should be used both for index name
// date math expressions and $now in scripts. This way all apis will deal with now in the same way instead
// of just for the _search api
final Index[] indices;
if (localIndices.indices().length == 0 && remoteClusterIndices.isEmpty() == false) {
indices = Index.EMPTY_ARRAY; // don't search on _all if only remote indices were specified
} else {
indices = indexNameExpressionResolver.concreteIndices(clusterState, searchRequest.indicesOptions(),
timeProvider.getAbsoluteStartMillis(), localIndices.indices());
}
final Index[] indices = resolveLocalIndices(localIndices, searchRequest.indicesOptions(), clusterState, timeProvider);
Map<String, AliasFilter> aliasFilter = buildPerIndexAliasFilter(searchRequest, clusterState, indices, remoteAliasMap);
Map<String, Set<String>> routingMap = indexNameExpressionResolver.resolveSearchRouting(clusterState, searchRequest.routing(),
searchRequest.indices());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,14 +262,16 @@ public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions,
Map<String, OriginalIndices> originalIndicesMap = new HashMap<>();
if (isCrossClusterSearchEnabled()) {
final Map<String, List<String>> groupedIndices = groupClusterIndices(indices, indexExists);
for (Map.Entry<String, List<String>> entry : groupedIndices.entrySet()) {
String clusterAlias = entry.getKey();
List<String> originalIndices = entry.getValue();
originalIndicesMap.put(clusterAlias,
new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), indicesOptions));
}
if (originalIndicesMap.containsKey(LOCAL_CLUSTER_GROUP_KEY) == false) {
if (groupedIndices.isEmpty()) {
//search on _all in the local cluster if neither local indices nor remote indices were specified
originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(Strings.EMPTY_ARRAY, indicesOptions));
} else {
for (Map.Entry<String, List<String>> entry : groupedIndices.entrySet()) {
String clusterAlias = entry.getKey();
List<String> originalIndices = entry.getValue();
originalIndicesMap.put(clusterAlias,
new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), indicesOptions));
}
}
} else {
originalIndicesMap.put(LOCAL_CLUSTER_GROUP_KEY, new OriginalIndices(indices, indicesOptions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.TransportService;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -64,7 +63,7 @@ public void tearDown() throws Exception {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}

public void testMergeShardsIterators() throws IOException {
public void testMergeShardsIterators() {
List<ShardIterator> localShardIterators = new ArrayList<>();
{
ShardId shardId = new ShardId("local_index", "local_index_uuid", 0);
Expand Down Expand Up @@ -146,7 +145,7 @@ public void testMergeShardsIterators() throws IOException {
}
}

public void testProcessRemoteShards() throws IOException {
public void testProcessRemoteShards() {
try (TransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
null)) {
RemoteClusterService service = transportService.getRemoteClusterService();
Expand Down Expand Up @@ -241,12 +240,12 @@ public void testProcessRemoteShards() throws IOException {
}

public void testBuildClusters() {
OriginalIndices localIndices = randomOriginalIndices();
OriginalIndices localIndices = randomBoolean() ? null : randomOriginalIndices();
Map<String, OriginalIndices> remoteIndices = new HashMap<>();
Map<String, ClusterSearchShardsResponse> searchShardsResponses = new HashMap<>();
int numRemoteClusters = randomIntBetween(0, 10);
boolean onlySuccessful = randomBoolean();
int localClusters = localIndices.indices().length == 0 ? 0 : 1;
int localClusters = localIndices == null ? 0 : 1;
int total = numRemoteClusters + localClusters;
int successful = localClusters;
int skipped = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.settings.AbstractScopedSettings;
import org.elasticsearch.common.settings.ClusterSettings;
Expand Down Expand Up @@ -151,7 +152,6 @@ public void testBuildRemoteClustersDynamicConfig() throws Exception {
assertEquals(boom.getVersion(), Version.CURRENT.minimumCompatibilityVersion());
}


public void testGroupClusterIndices() throws IOException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
Expand Down Expand Up @@ -179,10 +179,9 @@ public void testGroupClusterIndices() throws IOException {
Map<String, List<String>> perClusterIndices = service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar",
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo", "cluster*:baz", "*:boo", "no*match:boo"},
i -> false);
String[] localIndices = perClusterIndices.computeIfAbsent(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY,
k -> Collections.emptyList()).toArray(new String[0]);
assertNotNull(perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY));
assertArrayEquals(new String[]{"foo:bar", "foo", "no*match:boo"}, localIndices);
List<String> localIndices = perClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY);
assertNotNull(localIndices);
assertEquals(Arrays.asList("foo:bar", "foo", "no*match:boo"), localIndices);
assertEquals(2, perClusterIndices.size());
assertEquals(Arrays.asList("bar", "test", "baz", "boo"), perClusterIndices.get("cluster_1"));
assertEquals(Arrays.asList("foo:bar", "foo*", "baz", "boo"), perClusterIndices.get("cluster_2"));
Expand All @@ -198,6 +197,68 @@ public void testGroupClusterIndices() throws IOException {
}
}

public void testGroupIndices() throws IOException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
MockTransportService otherSeedTransport = startTransport("cluster_2_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode otherSeedNode = otherSeedTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(otherSeedTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());

try (MockTransportService transportService = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool,
null)) {
transportService.start();
transportService.acceptIncomingRequests();
Settings.Builder builder = Settings.builder();
builder.putList("cluster.remote.cluster_1.seeds", seedNode.getAddress().toString());
builder.putList("cluster.remote.cluster_2.seeds", otherSeedNode.getAddress().toString());
try (RemoteClusterService service = new RemoteClusterService(builder.build(), transportService)) {
assertFalse(service.isCrossClusterSearchEnabled());
service.initializeRemoteClusters();
assertTrue(service.isCrossClusterSearchEnabled());
assertTrue(service.isRemoteClusterRegistered("cluster_1"));
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
assertFalse(service.isRemoteClusterRegistered("foo"));
{
Map<String, OriginalIndices> perClusterIndices = service.groupIndices(IndicesOptions.LENIENT_EXPAND_OPEN,
new String[]{"foo:bar", "cluster_1:bar", "cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo",
"cluster*:baz", "*:boo", "no*match:boo"},
i -> false);
assertEquals(3, perClusterIndices.size());
assertArrayEquals(new String[]{"foo:bar", "foo", "no*match:boo"},
perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).indices());
assertArrayEquals(new String[]{"bar", "test", "baz", "boo"}, perClusterIndices.get("cluster_1").indices());
assertArrayEquals(new String[]{"foo:bar", "foo*", "baz", "boo"}, perClusterIndices.get("cluster_2").indices());
}
{
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () ->
service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar",
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, "cluster_1:bar"::equals));
assertEquals("Can not filter indices; index cluster_1:bar exists but there is also a remote cluster named:" +
" cluster_1", iae.getMessage());
}
{
Map<String, OriginalIndices> perClusterIndices = service.groupIndices(IndicesOptions.LENIENT_EXPAND_OPEN,
new String[]{"cluster_1:bar", "cluster_2:foo*"},
i -> false);
assertEquals(2, perClusterIndices.size());
assertArrayEquals(new String[]{"bar"}, perClusterIndices.get("cluster_1").indices());
assertArrayEquals(new String[]{"foo*"}, perClusterIndices.get("cluster_2").indices());
}
{
Map<String, OriginalIndices> perClusterIndices = service.groupIndices(IndicesOptions.LENIENT_EXPAND_OPEN,
Strings.EMPTY_ARRAY,
i -> false);
assertEquals(1, perClusterIndices.size());
assertArrayEquals(Strings.EMPTY_ARRAY, perClusterIndices.get(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY).indices());
}
}
}
}
}

public void testIncrementallyAddClusters() throws IOException {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("cluster_1_node", knownNodes, Version.CURRENT);
Expand Down