Skip to content

Commit

Permalink
Make all remote index name parsing go through RemoteClusterAware
Browse files Browse the repository at this point in the history
  • Loading branch information
smalyshev committed Sep 25, 2024
1 parent 3bd235d commit 19dc3bb
Show file tree
Hide file tree
Showing 17 changed files with 91 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -238,29 +237,20 @@ static Tuple<String, String> parseClusterAliasAndIndex(String indexExpression) {
return new Tuple<>(null, null);
}
String trimmed = indexExpression.trim();
String sep = String.valueOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR);
if (trimmed.startsWith(sep) || trimmed.endsWith(sep)) {
throw new IllegalArgumentException(
"Unable to parse one single valid index name from the provided index: [" + indexExpression + "]"
);
}

String[] parts = RemoteClusterAware.splitIndexName(trimmed);
// The parser here needs to ensure that the indexExpression is not of the form "remote1:blogs,remote2:blogs"
// because (1) only a single index is allowed for Painless Execute and
// (2) if this method returns Tuple("remote1", "blogs,remote2:blogs") that will not fail with "index not found".
// Instead, it will fail with the inaccurate and confusing error message:
// "Cross-cluster calls are not supported in this context but remote indices were requested: [blogs,remote1:blogs]"
// which comes later out of the IndexNameExpressionResolver pathway this code uses.
String[] parts = indexExpression.split(sep, 2);
if (parts.length == 1) {
return new Tuple<>(null, parts[0]);
} else if (parts.length == 2 && parts[1].contains(sep) == false) {
return new Tuple<>(parts[0], parts[1]);
} else {
if (parts[1].isEmpty() || parts[1].contains(String.valueOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR))) {
throw new IllegalArgumentException(
"Unable to parse one single valid index name from the provided index: [" + indexExpression + "]"
);
}

return new Tuple<>(parts[0], parts[1]);
}

public String getClusterAlias() {
Expand Down Expand Up @@ -556,18 +546,15 @@ protected void doExecute(Task task, Request request, ActionListener<Response> li
// Visible for testing
static void removeClusterAliasFromIndexExpression(Request request) {
if (request.index() != null) {
String[] split = request.index().split(String.valueOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR));
if (split.length > 1) {
String[] split = RemoteClusterAware.splitIndexName(request.index());
if (split[0] != null) {
/*
* if the cluster alias is null and the index field has a clusterAlias (clusterAlias:index notation)
* that means this is executing on a remote cluster (it was forwarded by the querying cluster).
* The clusterAlias is not Writeable, so it will be null in the ContextSetup on the remote cluster.
* We need to strip off the clusterAlias from the index before executing the script locally,
* so it will resolve to a local index
*/
assert split.length == 2
: "If the index contains the REMOTE_CLUSTER_INDEX_SEPARATOR it should have only two parts but it has "
+ Arrays.toString(split);
request.index(split[1]);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,21 +156,10 @@ static void validateAgainstAliases(
}

private static SearchRequest skipRemoteIndexNames(SearchRequest source) {
return new SearchRequest(source).indices(
Arrays.stream(source.indices()).filter(name -> isRemoteExpression(name) == false).toArray(String[]::new)
);
}

private static boolean isRemoteExpression(String expression) {
// An index expression that references a remote cluster uses ":" to separate the cluster-alias from the index portion of the
// expression, e.g., cluster0:index-name
// in the same time date-math `expression` can also contain ':' symbol inside its name
// to distinguish between those two, given `expression` is pre-evaluated using date-math resolver
// after evaluation date-math `expression` should not contain ':' symbol
// otherwise if `expression` is legit remote name, ':' symbol remains
// NOTE: index expressions can be prefixed with "-", which will not be parsed by resolveDateMathExpression,
// but in this particular case it doesn't seem to be relevant.
return IndexNameExpressionResolver.resolveDateMathExpression(expression)
.contains(String.valueOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR));
return new SearchRequest(source).indices(
Arrays.stream(source.indices()).filter(name -> RemoteClusterAware.isRemoteIndexName(name) == false).toArray(String[]::new)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.ValidateActions;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.transport.RemoteClusterService;
import org.elasticsearch.transport.RemoteClusterAware;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -166,13 +165,7 @@ public String getDescription() {

boolean localIndicesPresent(String[] indices) {
for (String index : indices) {
// ensure that `index` is a remote name and not a date math expression which includes ':' symbol
// since date math expression after evaluation should not contain ':' symbol
// NOTE: index expressions can be prefixed with "-" for index exclusion, which will not be parsed by resolveDateMathExpression
String indexExpression = IndexNameExpressionResolver.resolveDateMathExpression(
index.charAt(0) == '-' ? index.substring(1) : index
);
if (indexExpression.indexOf(RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR) < 0) {
if (RemoteClusterAware.isRemoteIndexName(index) == false) {
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,9 @@ public static ShardSearchFailure fromXContent(XContentParser parser) throws IOEx
if (SHARD_FIELD.equals(currentFieldName)) {
shardId = parser.intValue();
} else if (INDEX_FIELD.equals(currentFieldName)) {
indexName = parser.text();
int indexOf = indexName.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR);
if (indexOf > 0) {
clusterAlias = indexName.substring(0, indexOf);
indexName = indexName.substring(indexOf + 1);
}
String[] split = RemoteClusterAware.splitIndexName(parser.text());
clusterAlias = split[0];
indexName = split[1];
} else if (NODE_FIELD.equals(currentFieldName)) {
nodeId = parser.text();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,9 @@ private static SearchContextIdForNode readSearchContextIdForNodeExcludingContext

private static SearchContextIdForNode innerReadSearchContextIdForNode(String contextUUID, StreamInput in) throws IOException {
long id = in.readLong();
String target = in.readString();
String clusterAlias;
final int index = target.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR);
if (index == -1) {
clusterAlias = null;
} else {
clusterAlias = target.substring(0, index);
target = target.substring(index + 1);
}
String[] split = RemoteClusterAware.splitIndexName(in.readString());
String clusterAlias = split[0];
String target = split[1];
return new SearchContextIdForNode(clusterAlias, target, new ShardSearchContextId(contextUUID, id));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.elasticsearch.indices.InvalidIndexNameException;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.indices.SystemIndices.SystemIndexAccessLevel;
import org.elasticsearch.transport.RemoteClusterAware;

import java.time.Instant;
import java.time.ZoneId;
Expand Down Expand Up @@ -1753,7 +1754,7 @@ private static void ensureRemoteIndicesRequireIgnoreUnavailable(IndicesOptions o
return;
}
for (String index : indexExpressions) {
if (index.contains(":")) {
if (RemoteClusterAware.isRemoteIndexName(index)) {
failOnRemoteIndicesNotIgnoringUnavailable(indexExpressions);
}
}
Expand All @@ -1762,7 +1763,7 @@ private static void ensureRemoteIndicesRequireIgnoreUnavailable(IndicesOptions o
private static void failOnRemoteIndicesNotIgnoringUnavailable(List<String> indexExpressions) {
List<String> crossClusterIndices = new ArrayList<>();
for (String index : indexExpressions) {
if (index.contains(":")) {
if (RemoteClusterAware.isRemoteIndexName(index)) {
crossClusterIndices.add(index);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,12 @@ public SearchIndexNameMatcher(
* the separator ':', and must match on both the cluster alias and index name.
*/
public boolean test(String pattern) {
int separatorIndex = pattern.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR);
if (separatorIndex < 0) {
String[] splitIndex = RemoteClusterAware.splitIndexName(pattern);

if (splitIndex[0] == null) {
return clusterAlias == null && matchesIndex(pattern);
} else {
String clusterPattern = pattern.substring(0, separatorIndex);
String indexPattern = pattern.substring(separatorIndex + 1);

return Regex.simpleMatch(clusterPattern, clusterAlias) && matchesIndex(indexPattern);
return Regex.simpleMatch(splitIndex[0], clusterAlias) && matchesIndex(splitIndex[1]);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

Expand Down Expand Up @@ -143,15 +144,10 @@ static ShardProfileId parseCompositeProfileShardId(String compositeId) {
Matcher m = SHARD_ID_DECOMPOSITION.matcher(compositeId);
if (m.find()) {
String nodeId = m.group(1);
String indexName = m.group(2);
String[] tokens = RemoteClusterAware.splitIndexName(m.group(2));
String cluster = tokens[0];
String indexName = tokens[1];
int shardId = Integer.parseInt(m.group(3));
String cluster = null;
if (indexName.contains(":")) {
// index names and cluster names cannot contain a ':', so this split should be accurate
String[] tokens = indexName.split(":", 2);
cluster = tokens[0];
indexName = tokens[1];
}
return new ShardProfileId(nodeId, indexName, shardId, cluster);
} else {
assert false : "Unable to match input against expected pattern of [nodeId][indexName][shardId]. Input: " + compositeId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,33 @@ protected static Set<String> getEnabledRemoteClusters(final Settings settings) {
return RemoteConnectionStrategy.getRemoteClusters(settings);
}

public static boolean isRemoteIndexName(String indexName) {
if (indexName.charAt(0) == '<' || indexName.startsWith("-<")) {
// This is date math, but eve if it is not, the remote can't start with '<'.
// Thus, whatever it is, this is definitely not a remote index.
return false;
}
// Note remote index name also can not start with ':'
return indexName.indexOf(RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR) > 0;
}

public static String[] splitIndexName(String indexName) {
if (indexName.charAt(0) == '<' || indexName.startsWith("-<")) {
// This is date math, but eve if it is not, the remote can't start with '<'.
// Thus, whatever it is, this is definitely not a remote index.
return new String[] { null, indexName };
}
int i = indexName.indexOf(RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR);
if (i == 0) {
throw new IllegalArgumentException("index name [" + indexName + "] is invalid because the remote part is empty");
}
if (i < 0) {
return new String[] { null, indexName };
} else {
return new String[] { indexName.substring(0, i), indexName.substring(i + 1) };
}
}

/**
* Groups indices per cluster by splitting remote cluster-alias, index-name pairs on {@link #REMOTE_CLUSTER_INDEX_SEPARATOR}. All
* indices per cluster are collected as a list in the returned map keyed by the cluster alias. Local indices are grouped under
Expand All @@ -77,18 +104,20 @@ protected Map<String, List<String>> groupClusterIndices(Set<String> remoteCluste
for (String index : requestIndices) {
// ensure that `index` is a remote name and not a datemath expression which includes ':' symbol
// Remote names can not start with '<' so we are assuming that if the first character is '<' then it is a datemath expression.
boolean isDateMathExpression = (index.charAt(0) == '<' || index.startsWith("-<"));
int i = index.indexOf(RemoteClusterService.REMOTE_CLUSTER_INDEX_SEPARATOR);
if (isDateMathExpression == false && i >= 0) {
String[] split = splitIndexName(index);
if (split[0] != null) {
if (isRemoteClusterClientEnabled == false) {
assert remoteClusterNames.isEmpty() : remoteClusterNames;
throw new IllegalArgumentException("node [" + nodeName + "] does not have the remote cluster client role enabled");
}
int startIdx = index.charAt(0) == '-' ? 1 : 0;
String remoteClusterName = index.substring(startIdx, i);
List<String> clusters = ClusterNameExpressionResolver.resolveClusterNames(remoteClusterNames, remoteClusterName);
String indexName = index.substring(i + 1);
if (startIdx == 1) {
String remoteClusterName = split[0];
String indexName = split[1];
boolean isNegative = remoteClusterName.startsWith("-");
List<String> clusters = ClusterNameExpressionResolver.resolveClusterNames(
remoteClusterNames,
isNegative ? remoteClusterName.substring(1) : remoteClusterName
);
if (isNegative) {
if (indexName.equals("*") == false) {
throw new IllegalArgumentException(
Strings.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,11 +851,9 @@ public static SearchHit searchHitFromMap(Map<String, Object> values) {
String index = get(SearchHit.Fields._INDEX, values, null);
String clusterAlias = null;
if (index != null) {
int indexOf = index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR);
if (indexOf > 0) {
clusterAlias = index.substring(0, indexOf);
index = index.substring(indexOf + 1);
}
String[] split = RemoteClusterAware.splitIndexName(index);
clusterAlias = split[0];
index = split[1];
}
ShardId shardId = get(SearchHit.Fields._SHARD, values, null);
String nodeId = get(SearchHit.Fields._NODE, values, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ private void remoteClusterLicense(final String clusterAlias, final ActionListene
* @return true if the collection of indices contains a remote index, otherwise false
*/
public static boolean isRemoteIndex(final String index) {
return index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR) != -1;
return RemoteClusterAware.isRemoteIndexName(index);
}

/**
Expand Down Expand Up @@ -275,7 +275,7 @@ public static List<String> remoteIndices(final Collection<String> indices) {
public static List<String> remoteClusterAliases(final Set<String> remoteClusters, final List<String> indices) {
return indices.stream()
.filter(RemoteClusterLicenseChecker::isRemoteIndex)
.map(index -> index.substring(0, index.indexOf(RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR)))
.map(index -> RemoteClusterAware.splitIndexName(index)[0])
.distinct()
.flatMap(clusterExpression -> ClusterNameExpressionResolver.resolveClusterNames(remoteClusters, clusterExpression).stream())
.distinct()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.network.InetAddresses;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
Expand All @@ -27,7 +28,6 @@
import java.util.StringJoiner;

import static java.util.stream.Collectors.toList;
import static org.elasticsearch.transport.RemoteClusterAware.REMOTE_CLUSTER_INDEX_SEPARATOR;
import static org.elasticsearch.transport.RemoteClusterAware.buildRemoteIndexName;
import static org.elasticsearch.xpack.esql.core.util.NumericUtils.isUnsignedLong;

Expand Down Expand Up @@ -378,10 +378,8 @@ public static String ordinal(int i) {
}

public static Tuple<String, String> splitQualifiedIndex(String indexName) {
int separatorOffset = indexName.indexOf(REMOTE_CLUSTER_INDEX_SEPARATOR);
return separatorOffset > 0
? Tuple.tuple(indexName.substring(0, separatorOffset), indexName.substring(separatorOffset + 1))
: Tuple.tuple(null, indexName);
String[] split = RemoteClusterAware.splitIndexName(indexName);
return Tuple.tuple(split[0], split[1]);
}

public static String qualifyAndJoinIndices(String cluster, String[] indices) {
Expand All @@ -393,7 +391,7 @@ public static String qualifyAndJoinIndices(String cluster, String[] indices) {
}

public static boolean isQualified(String indexWildcard) {
return indexWildcard.indexOf(REMOTE_CLUSTER_INDEX_SEPARATOR) > 0;
return RemoteClusterAware.isRemoteIndexName(indexWildcard);
}

public static boolean isInteger(String value) {
Expand Down
Loading

0 comments on commit 19dc3bb

Please sign in to comment.