Skip to content

Commit

Permalink
Wildcard cluster names for cross cluster search (#23985)
Browse files Browse the repository at this point in the history
This is related to #23893. This commit allows users to use wilcards for
cluster names when executing a cross cluster search.

So instead of defining every cluster such as:

GET one:*,two:*,three:*/_search

A user could just search:

GET *:*/_search

As ":" characters are currently allowed in index names, if the text
up to the first ":" does not match a defined cluster name, the entire
string is treated as an index name.
  • Loading branch information
Tim-Brooks committed Apr 11, 2017
1 parent 2da4b68 commit 6bae215
Show file tree
Hide file tree
Showing 5 changed files with 218 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.routing.PlainShardIterator;
import org.elasticsearch.cluster.routing.ShardIterator;
Expand Down Expand Up @@ -59,6 +60,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -115,11 +117,13 @@ public final class RemoteClusterService extends AbstractComponent implements Clo

private final TransportService transportService;
private final int numRemoteConnections;
private final ClusterNameExpressionResolver clusterNameResolver;
private volatile Map<String, RemoteClusterConnection> remoteClusters = Collections.emptyMap();

RemoteClusterService(Settings settings, TransportService transportService) {
super(settings);
this.transportService = transportService;
this.clusterNameResolver = new ClusterNameExpressionResolver(settings);
numRemoteConnections = REMOTE_CONNECTIONS_PER_CLUSTER.get(settings);
}

Expand Down Expand Up @@ -216,25 +220,30 @@ boolean isRemoteNodeConnected(final String remoteCluster, final DiscoveryNode no
*/
Map<String, List<String>> groupClusterIndices(String[] requestIndices, Predicate<String> indexExists) {
Map<String, List<String>> perClusterIndices = new HashMap<>();
Set<String> remoteClusterNames = this.remoteClusters.keySet();
for (String index : requestIndices) {
int i = index.indexOf(REMOTE_CLUSTER_INDEX_SEPARATOR);
String indexName = index;
String clusterName = LOCAL_CLUSTER_GROUP_KEY;
if (i >= 0) {
String remoteClusterName = index.substring(0, i);
if (isRemoteClusterRegistered(remoteClusterName)) {
List<String> clusters = clusterNameResolver.resolveClusterNames(remoteClusterNames, remoteClusterName);
if (clusters.isEmpty() == false) {
if (indexExists.test(index)) {
// we use : as a separator for remote clusters. might conflict if there is an index that is actually named
// remote_cluster_alias:index_name - for this case we fail the request. the user can easily change the cluster alias
// if that happens
throw new IllegalArgumentException("Can not filter indices; index " + index +
" exists but there is also a remote cluster named: " + remoteClusterName);
}
String indexName = index.substring(i + 1);
for (String clusterName : clusters) {
perClusterIndices.computeIfAbsent(clusterName, k -> new ArrayList<>()).add(indexName);
}
indexName = index.substring(i + 1);
clusterName = remoteClusterName;
} else {
perClusterIndices.computeIfAbsent(LOCAL_CLUSTER_GROUP_KEY, k -> new ArrayList<>()).add(index);
}
} else {
perClusterIndices.computeIfAbsent(LOCAL_CLUSTER_GROUP_KEY, k -> new ArrayList<>()).add(index);
}
perClusterIndices.computeIfAbsent(clusterName, k -> new ArrayList<String>()).add(indexName);
}
return perClusterIndices;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.settings.Settings;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/**
* Resolves cluster names from an expression. The expression must be the exact match of a cluster
* name or must be a wildcard expression.
*/
public final class ClusterNameExpressionResolver extends AbstractComponent {

private final WildcardExpressionResolver wildcardResolver = new WildcardExpressionResolver();

public ClusterNameExpressionResolver(Settings settings) {
super(settings);
}

/**
* Resolves the provided cluster expression to matching cluster names. This method only
* supports exact or wildcard matches.
*
* @param remoteClusters the aliases for remote clusters
* @param clusterExpression the expressions that can be resolved to cluster names.
* @return the resolved cluster aliases.
*/
public List<String> resolveClusterNames(Set<String> remoteClusters, String clusterExpression) {
if (remoteClusters.contains(clusterExpression)) {
return Collections.singletonList(clusterExpression);
} else if (Regex.isSimpleMatchPattern(clusterExpression)) {
return wildcardResolver.resolve(remoteClusters, clusterExpression);
} else {
return Collections.emptyList();
}
}

private static class WildcardExpressionResolver {

private List<String> resolve(Set<String> remoteClusters, String clusterExpression) {
if (isTrivialWildcard(clusterExpression)) {
return resolveTrivialWildcard(remoteClusters);
}

Set<String> matches = matches(remoteClusters, clusterExpression);
if (matches.isEmpty()) {
return Collections.emptyList();
} else {
return new ArrayList<>(matches);
}
}

private boolean isTrivialWildcard(String clusterExpression) {
return Regex.isMatchAllPattern(clusterExpression);
}

private List<String> resolveTrivialWildcard(Set<String> remoteClusters) {
return new ArrayList<>(remoteClusters);
}

private static Set<String> matches(Set<String> remoteClusters, String expression) {
if (expression.indexOf("*") == expression.length() - 1) {
return otherWildcard(remoteClusters, expression);
} else {
return otherWildcard(remoteClusters, expression);
}
}

private static Set<String> otherWildcard(Set<String> remoteClusters, String expression) {
final String pattern = expression;
return remoteClusters.stream()
.filter(n -> Regex.simpleMatch(pattern, n))
.collect(Collectors.toSet());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,14 @@ public void testGroupClusterIndices() throws IOException {
assertTrue(service.isRemoteClusterRegistered("cluster_2"));
assertFalse(service.isRemoteClusterRegistered("foo"));
Map<String, List<String>> perClusterIndices = service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar",
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo"}, i -> false);
"cluster_2:foo:bar", "cluster_1:test", "cluster_2:foo*", "foo", "cluster*:baz", "*:boo", "no*match:boo"}, i -> false);
String[] localIndices = perClusterIndices.computeIfAbsent(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY,
k -> Collections.emptyList()).toArray(new String[0]);
assertNotNull(perClusterIndices.remove(RemoteClusterService.LOCAL_CLUSTER_GROUP_KEY));
assertArrayEquals(new String[]{"foo:bar", "foo"}, localIndices);
assertArrayEquals(new String[]{"foo:bar", "foo", "no*match:boo"}, localIndices);
assertEquals(2, perClusterIndices.size());
assertEquals(Arrays.asList("bar", "test"), perClusterIndices.get("cluster_1"));
assertEquals(Arrays.asList("foo:bar", "foo*"), perClusterIndices.get("cluster_2"));
assertEquals(Arrays.asList("bar", "test", "baz", "boo"), perClusterIndices.get("cluster_1"));
assertEquals(Arrays.asList("foo:bar", "foo*", "baz", "boo"), perClusterIndices.get("cluster_2"));

IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () ->
service.groupClusterIndices(new String[]{"foo:bar", "cluster_1:bar",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.cluster.metadata;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.test.ESTestCase;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class ClusterNameExpressionResolverTests extends ESTestCase {

private ClusterNameExpressionResolver clusterNameResolver = new ClusterNameExpressionResolver(Settings.EMPTY);
private static final Set<String> remoteClusters = new HashSet<>();

static {
remoteClusters.add("cluster1");
remoteClusters.add("cluster2");
remoteClusters.add("totallyDifferent");
}

public void testExactMatch() {
List<String> clusters = clusterNameResolver.resolveClusterNames(remoteClusters, "totallyDifferent");
assertEquals(new HashSet<>(Arrays.asList("totallyDifferent")), new HashSet<>(clusters));
}

public void testNoWildCardNoMatch() {
List<String> clusters = clusterNameResolver.resolveClusterNames(remoteClusters, "totallyDifferent2");
assertTrue(clusters.isEmpty());
}

public void testWildCardNoMatch() {
List<String> clusters = clusterNameResolver.resolveClusterNames(remoteClusters, "totally*2");
assertTrue(clusters.isEmpty());
}

public void testSimpleWildCard() {
List<String> clusters = clusterNameResolver.resolveClusterNames(remoteClusters, "*");
assertEquals(new HashSet<>(Arrays.asList("cluster1", "cluster2", "totallyDifferent")), new HashSet<>(clusters));
}

public void testSuffixWildCard() {
List<String> clusters = clusterNameResolver.resolveClusterNames(remoteClusters, "cluster*");
assertEquals(new HashSet<>(Arrays.asList("cluster1", "cluster2")), new HashSet<>(clusters));
}

public void testPrefixWildCard() {
List<String> clusters = clusterNameResolver.resolveClusterNames(remoteClusters, "*Different");
assertEquals(new HashSet<>(Arrays.asList("totallyDifferent")), new HashSet<>(clusters));
}

public void testMiddleWildCard() {
List<String> clusters = clusterNameResolver.resolveClusterNames(remoteClusters, "clu*1");
assertEquals(new HashSet<>(Arrays.asList("cluster1")), new HashSet<>(clusters));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,30 @@
- match: { hits.total: 6 }
- match: { hits.hits.0._index: "test_remote_cluster:test_index" }

---
"Test wildcard search":
- do:
cluster.get_settings:
include_defaults: true

- set: { defaults.search.remote.my_remote_cluster.seeds.0: remote_ip }

- do:
cluster.put_settings:
flat_settings: true
body:
transient:
search.remote.test_remote_cluster.seeds: $remote_ip

- match: {transient: {search.remote.test_remote_cluster.seeds: $remote_ip}}

- do:
search:
index: "*:test_index"

- match: { _shards.total: 6 }
- match: { hits.total: 12 }

---
"Search an filtered alias on the remote cluster":

Expand Down

0 comments on commit 6bae215

Please sign in to comment.