Skip to content

Commit

Permalink
Add filter support to data stream aliases (elastic#74784)
Browse files Browse the repository at this point in the history
This allows specifying a query as filter on data stream alias,
which will then always be applied when searching via this alias.

Relates elastic#66163
  • Loading branch information
martijnvg authored and ywangd committed Jul 30, 2021
1 parent ed9699f commit f6a339e
Show file tree
Hide file tree
Showing 19 changed files with 788 additions and 174 deletions.
6 changes: 3 additions & 3 deletions docs/reference/indices/aliases.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ parameter.
// tag::alias-options[]
`filter`::
(Optional, <<query-dsl,Query DSL object>> Query used to limit documents the
alias can access. Data stream aliases don't support this parameter.
alias can access.
// end::alias-options[]
+
Only the `add` action supports this parameter.
Expand All @@ -109,7 +109,7 @@ indices return an error.
(Required*, array of strings) Data streams or indices for the action. Supports
wildcards (`*`). If `index` is not specified, this parameter is required. For
the `add` and `remove_index` actions, wildcard patterns that match both data
streams and indices return an error.
streams and indices return an error.

// tag::alias-options[]
`index_routing`::
Expand Down Expand Up @@ -140,7 +140,7 @@ Only the `add` action supports this parameter.
`must_exist`::
(Optional, Boolean)
If `true`, the alias must exist to perform the action. Defaults to `false`. Only
the `remove` action supports this parameter.
the `remove` action supports this parameter.

// tag::alias-options[]
`routing`::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasAction;
import org.elasticsearch.cluster.metadata.AliasAction.AddDataStreamAlias;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.DataStreamAlias;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
Expand Down Expand Up @@ -108,9 +109,6 @@ protected void masterOperation(Task task, final IndicesAliasesRequest request, f
switch (action.actionType()) {
case ADD:
// Fail if parameters are used that data stream aliases don't support:
if (action.filter() != null) {
throw new IllegalArgumentException("aliases that point to data streams don't support filters");
}
if (action.routing() != null) {
throw new IllegalArgumentException("aliases that point to data streams don't support routing");
}
Expand All @@ -130,7 +128,7 @@ protected void masterOperation(Task task, final IndicesAliasesRequest request, f
}
for (String dataStreamName : concreteDataStreams) {
for (String alias : concreteDataStreamAliases(action, state.metadata(), dataStreamName)) {
finalActions.add(new AliasAction.AddDataStreamAlias(alias, dataStreamName, action.writeIndex()));
finalActions.add(new AddDataStreamAlias(alias, dataStreamName, action.writeIndex(), action.filter()));
}
}
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,14 @@ public static class AddDataStreamAlias extends AliasAction {
private final String aliasName;
private final String dataStreamName;
private final Boolean isWriteDataStream;
private final String filter;

public AddDataStreamAlias(String aliasName, String dataStreamName, Boolean isWriteDataStream) {
public AddDataStreamAlias(String aliasName, String dataStreamName, Boolean isWriteDataStream, String filter) {
super(dataStreamName);
this.aliasName = aliasName;
this.dataStreamName = dataStreamName;
this.isWriteDataStream = isWriteDataStream;
this.filter = filter;
}

public String getAliasName() {
Expand All @@ -231,8 +233,8 @@ boolean removeIndex() {

@Override
boolean apply(NewAliasValidator aliasValidator, Metadata.Builder metadata, IndexMetadata index) {
aliasValidator.validate(aliasName, null, null, isWriteDataStream);
return metadata.put(aliasName, dataStreamName, isWriteDataStream);
aliasValidator.validate(aliasName, null, filter, isWriteDataStream);
return metadata.put(aliasName, dataStreamName, isWriteDataStream, filter);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,28 @@
*/
package org.elasticsearch.cluster.metadata;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ParseField;
import org.elasticsearch.common.ParsingException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.function.Predicate;
Expand All @@ -30,34 +38,77 @@ public class DataStreamAlias extends AbstractDiffable<DataStreamAlias> implement

public static final ParseField DATA_STREAMS_FIELD = new ParseField("data_streams");
public static final ParseField WRITE_DATA_STREAM_FIELD = new ParseField("write_data_stream");
public static final ParseField FILTER_FIELD = new ParseField("filter");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStreamAlias, String> PARSER = new ConstructingObjectParser<>(
"data_stream_alias",
false,
(args, name) -> new DataStreamAlias(name, (List<String>) args[0], (String) args[1])
(args, name) -> new DataStreamAlias(name, (List<String>) args[0], (String) args[1], (CompressedXContent) args[2])
);

static {
PARSER.declareStringArray(ConstructingObjectParser.constructorArg(), DATA_STREAMS_FIELD);
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), WRITE_DATA_STREAM_FIELD);
PARSER.declareField(
ConstructingObjectParser.optionalConstructorArg(),
(p, c) -> {
if (p.currentToken() == XContentParser.Token.VALUE_EMBEDDED_OBJECT ||
p.currentToken() == XContentParser.Token.VALUE_STRING) {
return new CompressedXContent(p.binaryValue());
} else if (p.currentToken() == XContentParser.Token.START_OBJECT) {
XContentBuilder builder = XContentFactory.jsonBuilder().map(p.mapOrdered());
return new CompressedXContent(BytesReference.bytes(builder));
} else {
assert false : "unexpected token [" + p.currentToken() + " ]";
return null;
}
},
FILTER_FIELD,
ObjectParser.ValueType.VALUE_OBJECT_ARRAY
);
}

private final String name;
private final List<String> dataStreams;
private final String writeDataStream;
private final CompressedXContent filter;

public DataStreamAlias(String name, List<String> dataStreams, String writeDataStream) {
private DataStreamAlias(String name, List<String> dataStreams, String writeDataStream, CompressedXContent filter) {
this.name = Objects.requireNonNull(name);
this.dataStreams = List.copyOf(dataStreams);
this.writeDataStream = writeDataStream;
this.filter = filter;
assert writeDataStream == null || dataStreams.contains(writeDataStream);
}

public DataStreamAlias(String name, List<String> dataStreams, String writeDataStream, Map<String, Object> filter) {
this(name, dataStreams, writeDataStream, compress(filter));
}

private static CompressedXContent compress(Map<String, Object> filterAsMap) {
if (filterAsMap == null) {
return null;
}

try {
XContentBuilder builder = XContentFactory.jsonBuilder().map(filterAsMap);
return new CompressedXContent(BytesReference.bytes(builder));
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

private static Map<String, Object> decompress(CompressedXContent filter) {
String filterAsString = filter.string();
return XContentHelper.convertToMap(XContentFactory.xContent(filterAsString), filterAsString, true);
}

public DataStreamAlias(StreamInput in) throws IOException {
this.name = in.readString();
this.dataStreams = in.readStringList();
this.writeDataStream = in.readOptionalString();
this.filter = in.getVersion().onOrAfter(Version.V_8_0_0) && in.readBoolean() ? CompressedXContent.readCompressedString(in) : null;
}

/**
Expand Down Expand Up @@ -85,15 +136,20 @@ public String getWriteDataStream() {
return writeDataStream;
}

public CompressedXContent getFilter() {
return filter;
}

/**
* Returns a new {@link DataStreamAlias} instance with the provided data stream name added to it as a new member.
* If the provided isWriteDataStream is set to <code>true</code> then the provided data stream is also set as write data stream.
* If the provided isWriteDataStream is set to <code>false</code> and the provided data stream is also the write data stream of
* this instance then the returned data stream alias instance's write data stream is unset.
* If the provided filter is the same as the filter of this alias then this instance isn't updated, otherwise it is updated.
*
* The same instance is returned if the attempted addition of the provided data stream didn't change this instance.
*/
public DataStreamAlias addDataStream(String dataStream, Boolean isWriteDataStream) {
public DataStreamAlias update(String dataStream, Boolean isWriteDataStream, Map<String, Object> filterAsMap) {
String writeDataStream = this.writeDataStream;
if (isWriteDataStream != null) {
if (isWriteDataStream) {
Expand All @@ -105,10 +161,24 @@ public DataStreamAlias addDataStream(String dataStream, Boolean isWriteDataStrea
}
}

boolean filterUpdated;
CompressedXContent filter;
if (filterAsMap != null) {
filter = compress(filterAsMap);
if (this.filter == null) {
filterUpdated = true;
} else {
filterUpdated = filterAsMap.equals(decompress(this.filter)) == false;
}
} else {
filter = this.filter;
filterUpdated = false;
}

Set<String> dataStreams = new HashSet<>(this.dataStreams);
boolean added = dataStreams.add(dataStream);
if (added || Objects.equals(this.writeDataStream, writeDataStream) == false) {
return new DataStreamAlias(name, List.copyOf(dataStreams), writeDataStream);
if (added || Objects.equals(this.writeDataStream, writeDataStream) == false || filterUpdated) {
return new DataStreamAlias(name, List.copyOf(dataStreams), writeDataStream, filter);
} else {
return this;
}
Expand All @@ -133,7 +203,7 @@ public DataStreamAlias removeDataStream(String dataStream) {
if (dataStream.equals(writeDataStream)) {
writeDataStream = null;
}
return new DataStreamAlias(name, List.copyOf(dataStreams), writeDataStream);
return new DataStreamAlias(name, List.copyOf(dataStreams), writeDataStream, filter);
}
}

Expand All @@ -152,7 +222,7 @@ public DataStreamAlias intersect(Predicate<String> filter) {
if (intersectingDataStreams.contains(writeDataStream) == false) {
writeDataStream = null;
}
return new DataStreamAlias(this.name, intersectingDataStreams, writeDataStream);
return new DataStreamAlias(this.name, intersectingDataStreams, writeDataStream, this.filter);
}

/**
Expand All @@ -171,7 +241,7 @@ public DataStreamAlias merge(DataStreamAlias other) {
}
}

return new DataStreamAlias(this.name, List.copyOf(mergedDataStreams), writeDataStream);
return new DataStreamAlias(this.name, List.copyOf(mergedDataStreams), writeDataStream, filter);
}

/**
Expand All @@ -187,7 +257,7 @@ public DataStreamAlias renameDataStreams(String renamePattern, String renameRepl
if (writeDataStream != null) {
writeDataStream = writeDataStream.replaceAll(renamePattern, renameReplacement);
}
return new DataStreamAlias(this.name, renamedDataStreams, writeDataStream);
return new DataStreamAlias(this.name, renamedDataStreams, writeDataStream, filter);
}

public static Diff<DataStreamAlias> readDiffFrom(StreamInput in) throws IOException {
Expand All @@ -210,6 +280,14 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (writeDataStream != null) {
builder.field(WRITE_DATA_STREAM_FIELD.getPreferredName(), writeDataStream);
}
if (filter != null) {
boolean binary = params.paramAsBoolean("binary", false);
if (binary) {
builder.field("filter", filter.compressed());
} else {
builder.field("filter", XContentHelper.convertToMap(filter.uncompressed(), true).v2());
}
}
builder.endObject();
return builder;
}
Expand All @@ -219,6 +297,14 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeString(name);
out.writeStringCollection(dataStreams);
out.writeOptionalString(writeDataStream);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
if (filter != null) {
out.writeBoolean(true);
filter.writeTo(out);
} else {
out.writeBoolean(false);
}
}
}

@Override
Expand All @@ -228,11 +314,22 @@ public boolean equals(Object o) {
DataStreamAlias that = (DataStreamAlias) o;
return Objects.equals(name, that.name) &&
Objects.equals(dataStreams, that.dataStreams) &&
Objects.equals(writeDataStream, that.writeDataStream);
Objects.equals(writeDataStream, that.writeDataStream) &&
Objects.equals(filter, that.filter);
}

@Override
public int hashCode() {
return Objects.hash(name, dataStreams, writeDataStream);
return Objects.hash(name, dataStreams, writeDataStream, filter);
}

@Override
public String toString() {
return "DataStreamAlias{" +
"name='" + name + '\'' +
", dataStreams=" + dataStreams +
", writeDataStream='" + writeDataStream + '\'' +
", filter=" + filter.string() +
'}';
}
}
Loading

0 comments on commit f6a339e

Please sign in to comment.