Skip to content

Commit

Permalink
Ignore matching data streams if include_data_streams is false (#57900)
Browse files Browse the repository at this point in the history
  • Loading branch information
danhermann authored Jul 3, 2020
1 parent 4579593 commit c3aaf33
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 94 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ tasks.register("verifyVersions") {
* after the backport of the backcompat code is complete.
*/

boolean bwc_tests_enabled = true
final String bwc_tests_disabled_issue = "" /* place a PR link here when committing bwc changes */
boolean bwc_tests_enabled = false
final String bwc_tests_disabled_issue = "https://github.com/elastic/elasticsearch/pull/57900" /* place a PR link here when committing bwc changes */
if (bwc_tests_enabled == false) {
if (bwc_tests_disabled_issue.isEmpty()) {
throw new GradleException("bwc_tests_disabled_issue must be set when bwc_tests_enabled == false")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
- match: { hits.hits.0._source.foo: 'bar' }

- do:
catch: bad_request
catch: missing
indices.delete:
index: logs-foobar

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import org.elasticsearch.action.admin.indices.template.delete.DeleteComposableIndexTemplateAction;
import org.elasticsearch.action.admin.indices.template.put.PutComposableIndexTemplateAction;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequestBuilder;
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryResponse;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
Expand Down Expand Up @@ -209,34 +208,22 @@ public void testOtherWriteOps() throws Exception {
CreateDataStreamAction.Request createDataStreamRequest = new CreateDataStreamAction.Request(dataStreamName);
client().admin().indices().createDataStream(createDataStreamRequest).get();

{
BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(dataStreamName).source("{\"@timestamp1\": \"2020-12-12\"}", XContentType.JSON));
expectFailure(dataStreamName, () -> client().bulk(bulkRequest).actionGet());
}
{
BulkRequest bulkRequest = new BulkRequest()
.add(new DeleteRequest(dataStreamName, "_id"));
expectFailure(dataStreamName, () -> client().bulk(bulkRequest).actionGet());
}
{
BulkRequest bulkRequest = new BulkRequest()
.add(new UpdateRequest(dataStreamName, "_id").doc("{\"@timestamp1\": \"2020-12-12\"}", XContentType.JSON));
expectFailure(dataStreamName, () -> client().bulk(bulkRequest).actionGet());
}
{
IndexRequest indexRequest = new IndexRequest(dataStreamName)
.source("{\"@timestamp1\": \"2020-12-12\"}", XContentType.JSON);
expectFailure(dataStreamName, () -> client().index(indexRequest).actionGet());
Exception e = expectThrows(IndexNotFoundException.class, () -> client().index(indexRequest).actionGet());
assertThat(e.getMessage(), equalTo("no such index [null]"));
}
{
UpdateRequest updateRequest = new UpdateRequest(dataStreamName, "_id")
.doc("{}", XContentType.JSON);
expectFailure(dataStreamName, () -> client().update(updateRequest).actionGet());
Exception e = expectThrows(IndexNotFoundException.class, () -> client().update(updateRequest).actionGet());
assertThat(e.getMessage(), equalTo("no such index [null]"));
}
{
DeleteRequest deleteRequest = new DeleteRequest(dataStreamName, "_id");
expectFailure(dataStreamName, () -> client().delete(deleteRequest).actionGet());
Exception e = expectThrows(IndexNotFoundException.class, () -> client().delete(deleteRequest).actionGet());
assertThat(e.getMessage(), equalTo("no such index [null]"));
}
{
IndexRequest indexRequest = new IndexRequest(dataStreamName)
Expand Down Expand Up @@ -428,7 +415,7 @@ public void testResolvabilityOfDataStreamsInAPIs() throws Exception {
verifyResolvability(wildcardExpression, client().admin().indices().prepareUpgrade(wildcardExpression), false);
verifyResolvability(wildcardExpression, client().admin().indices().prepareRecoveries(wildcardExpression), false);
verifyResolvability(wildcardExpression, client().admin().indices().prepareUpgradeStatus(wildcardExpression), false);
verifyResolvability(wildcardExpression, getAliases(wildcardExpression), true);
verifyResolvability(wildcardExpression, getAliases(wildcardExpression), false);
verifyResolvability(wildcardExpression, getFieldMapping(wildcardExpression), false);
verifyResolvability(wildcardExpression,
putMapping("{\"_doc\":{\"properties\": {\"my_field\":{\"type\":\"keyword\"}}}}", wildcardExpression), false);
Expand Down Expand Up @@ -477,7 +464,8 @@ public void testAliasActionsFailOnDataStreams() throws Exception {
.index(dataStreamName).aliases("foo");
IndicesAliasesRequest aliasesAddRequest = new IndicesAliasesRequest();
aliasesAddRequest.addAliasAction(addAction);
expectFailure(dataStreamName, () -> client().admin().indices().aliases(aliasesAddRequest).actionGet());
Exception e = expectThrows(IndexNotFoundException.class, () -> client().admin().indices().aliases(aliasesAddRequest).actionGet());
assertThat(e.getMessage(), equalTo("no such index [" + dataStreamName +"]"));
}

public void testAliasActionsFailOnDataStreamBackingIndices() throws Exception {
Expand Down Expand Up @@ -789,19 +777,18 @@ private static void verifyResolvability(String dataStream, ActionRequestBuilder

private static void verifyResolvability(String dataStream, ActionRequestBuilder requestBuilder, boolean fail, long expectedCount) {
if (fail) {
String expectedErrorMessage = "The provided expression [" + dataStream +
"] matches a data stream, specify the corresponding concrete indices instead.";
String expectedErrorMessage = "no such index [" + dataStream + "]";
if (requestBuilder instanceof MultiSearchRequestBuilder) {
MultiSearchResponse multiSearchResponse = ((MultiSearchRequestBuilder) requestBuilder).get();
assertThat(multiSearchResponse.getResponses().length, equalTo(1));
assertThat(multiSearchResponse.getResponses()[0].isFailure(), is(true));
assertThat(multiSearchResponse.getResponses()[0].getFailure(), instanceOf(IllegalArgumentException.class));
assertThat(multiSearchResponse.getResponses()[0].getFailure().getMessage(), equalTo(expectedErrorMessage));
} else if (requestBuilder instanceof ValidateQueryRequestBuilder) {
ValidateQueryResponse response = (ValidateQueryResponse) requestBuilder.get();
assertThat(response.getQueryExplanation().get(0).getError(), equalTo(expectedErrorMessage));
Exception e = expectThrows(IndexNotFoundException.class, requestBuilder::get);
assertThat(e.getMessage(), equalTo(expectedErrorMessage));
} else {
Exception e = expectThrows(IllegalArgumentException.class, requestBuilder::get);
Exception e = expectThrows(IndexNotFoundException.class, requestBuilder::get);
assertThat(e.getMessage(), equalTo(expectedErrorMessage));
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ Index[] concreteIndices(Context context, String... indexExpressions) {
}
} else if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM &&
context.includeDataStreams() == false) {
throw dataStreamsNotSupportedException(expression);
continue;
}

if (indexAbstraction.getType() == IndexAbstraction.Type.ALIAS && context.isResolveToWriteIndex()) {
Expand Down Expand Up @@ -298,11 +298,6 @@ private static IllegalArgumentException aliasesNotSupportedException(String expr
"alias, specify the corresponding concrete indices instead.");
}

private static IllegalArgumentException dataStreamsNotSupportedException(String expression) {
return new IllegalArgumentException("The provided expression [" + expression + "] matches a " +
"data stream, specify the corresponding concrete indices instead.");
}

/**
* Utility method that allows to resolve an index expression to its corresponding single concrete index.
* Callers should make sure they provide proper {@link org.elasticsearch.action.support.IndicesOptions}
Expand Down Expand Up @@ -354,7 +349,12 @@ public Index concreteWriteIndex(ClusterState state, IndicesRequest request) {
*/
public Index concreteWriteIndex(ClusterState state, IndicesOptions options, String index, boolean allowNoIndices,
boolean includeDataStreams) {
Context context = new Context(state, options, false, true, includeDataStreams);
IndicesOptions combinedOptions = IndicesOptions.fromOptions(options.ignoreUnavailable(), allowNoIndices,
options.expandWildcardsOpen(), options.expandWildcardsClosed(), options.expandWildcardsHidden(),
options.allowAliasesToMultipleIndices(), options.forbidClosedIndices(), options.ignoreAliases(),
options.ignoreThrottled());

Context context = new Context(state, combinedOptions, false, true, includeDataStreams);
Index[] indices = concreteIndices(context, index);
if (allowNoIndices && indices.length == 0) {
return null;
Expand Down Expand Up @@ -729,10 +729,6 @@ public List<String> resolve(Context context, List<String> expressions) {
}

if (isEmptyOrTrivialWildcard(expressions)) {
if (context.includeDataStreams() == false && metadata.dataStreams().isEmpty() == false) {
throw dataStreamsNotSupportedException(expressions.toString());
}

List<String> resolvedExpressions = resolveEmptyOrTrivialWildcard(options, metadata);
if (context.includeDataStreams()) {
final IndexMetadata.State excludeState = excludeState(options);
Expand Down Expand Up @@ -797,8 +793,8 @@ private Set<String> innerResolve(Context context, List<String> expressions, Indi
} else if (indexAbstraction.getType() == IndexAbstraction.Type.ALIAS && options.ignoreAliases()) {
throw aliasesNotSupportedException(expression);
} else if (indexAbstraction.getType() == IndexAbstraction.Type.DATA_STREAM &&
context.includeDataStreams() == false) {
throw dataStreamsNotSupportedException(expression);
context.includeDataStreams() == false) {
throw indexNotFoundException(expression);
}
}
if (add) {
Expand Down Expand Up @@ -878,7 +874,7 @@ private static IndexMetadata.State excludeState(IndicesOptions options) {

public static Map<String, IndexAbstraction> matches(Context context, Metadata metadata, String expression) {
if (Regex.isMatchAllPattern(expression)) {
return filterIndicesLookup(context, metadata.getIndicesLookup(), null, expression, context.getOptions());
return filterIndicesLookup(context, metadata.getIndicesLookup(), null, context.getOptions());
} else if (expression.indexOf("*") == expression.length() - 1) {
return suffixWildcard(context, metadata, expression);
} else {
Expand All @@ -893,18 +889,17 @@ private static Map<String, IndexAbstraction> suffixWildcard(Context context, Met
toPrefixCharArr[toPrefixCharArr.length - 1]++;
String toPrefix = new String(toPrefixCharArr);
SortedMap<String, IndexAbstraction> subMap = metadata.getIndicesLookup().subMap(fromPrefix, toPrefix);
return filterIndicesLookup(context, subMap, null, expression, context.getOptions());
return filterIndicesLookup(context, subMap, null, context.getOptions());
}

private static Map<String, IndexAbstraction> otherWildcard(Context context, Metadata metadata, String expression) {
final String pattern = expression;
return filterIndicesLookup(context, metadata.getIndicesLookup(), e -> Regex.simpleMatch(pattern, e.getKey()),
expression, context.getOptions());
context.getOptions());
}

private static Map<String, IndexAbstraction> filterIndicesLookup(Context context, SortedMap<String, IndexAbstraction> indicesLookup,
Predicate<? super Map.Entry<String, IndexAbstraction>> filter,
String expression,
IndicesOptions options) {
boolean shouldConsumeStream = false;
Stream<Map.Entry<String, IndexAbstraction>> stream = indicesLookup.entrySet().stream();
Expand All @@ -918,11 +913,7 @@ private static Map<String, IndexAbstraction> filterIndicesLookup(Context context
}
if (context.includeDataStreams() == false) {
shouldConsumeStream = true;
stream = stream.peek(e -> {
if (e.getValue().getType() == IndexAbstraction.Type.DATA_STREAM) {
throw dataStreamsNotSupportedException(expression);
}
});
stream = stream.filter(e -> e.getValue().getType() != IndexAbstraction.Type.DATA_STREAM);
}
if (shouldConsumeStream) {
return stream.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
Expand Down
Loading

0 comments on commit c3aaf33

Please sign in to comment.