Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic alias support for data streams #72613

Merged
merged 6 commits into from
May 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions docs/reference/indices/aliases.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,101 @@ POST /_aliases
}
--------------------------------------------------
// TEST[s/^/PUT test\nPUT test2\n/]

[[aliases-data-streams]]
===== Data stream aliases

An alias can also point to one or more data streams. To add a data stream to an
alias, specify the stream's name in the `index` parameter. Wildcards (`*`) are
supported. If a wildcard pattern matches both data streams and indices, the
action only uses the matching data streams.

You can only specify data streams in the `add` and `remove` actions. Aliases
that point to data streams do not support the following parameters:

* `filter`
* `index_routing`
* `is_write_index`
* `routing`
* `search_routing`

For example, the following request adds two data streams to the `logs` alias.

////
[source,console]
----
PUT _data_stream/logs-my_app-default

PUT _data_stream/logs-nginx.access-prod
----
////

[source,console]
----
POST _aliases
{
"actions": [
{ "add": { "index": "logs-my_app-default", "alias": "logs" }},
{ "add": { "index": "logs-nginx.access-prod", "alias": "logs" }}
]
}
----
// TEST[continued]

To verify the alias points to both data streams, use the
<<indices-get-alias,get index alias API>>.

[source,console]
----
GET logs-*/_alias
----
// TEST[continued]

The API returns:

[source,console-result]
----
{
"logs-my_app-default": {
"aliases": {
"logs": {}
}
},
"logs-nginx.access-prod": {
"aliases": {
"logs": {}
}
}
}
----

Use the `remove` action to remove a data stream from an alias.

[source,console]
----
POST _aliases
{
"actions": [
{ "remove": { "index": "logs-my_app-default", "alias": "logs" }}
]
}

GET logs-*/_alias
----
// TEST[continued]

The get index alias API returns:

[source,console-result]
----
{
"logs-my_app-default": {
"aliases": {}
},
"logs-nginx.access-prod": {
"aliases": {
"logs": {}
}
}
}
----
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ public void testAliasNameExistingIndex() throws Exception {

InvalidAliasNameException e = expectThrows(InvalidAliasNameException.class,
() -> createIndex("test"));
assertThat(e.getMessage(), equalTo("Invalid alias name [index], an index exists with the same name as the alias"));
assertThat(e.getMessage(), equalTo("Invalid alias name [index]: an index or data stream exists with the same name as the alias"));
}

public void testAliasEmptyName() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,27 @@ protected void masterOperation(Task task, final IndicesAliasesRequest request, f
// Resolve all the AliasActions into AliasAction instances and gather all the aliases
Set<String> aliases = new HashSet<>();
for (AliasActions action : actions) {
List<String> concreteDataStreams =
indexNameExpressionResolver.dataStreamNames(state, request.indicesOptions(), action.indices());
if (concreteDataStreams.size() != 0) {
switch (action.actionType()) {
case ADD:
for (String dataStreamName : concreteDataStreams) {
finalActions.add(new AliasAction.AddDataStreamAlias(action.aliases()[0], dataStreamName));
}
break;
case REMOVE:
for (String dataStreamName : concreteDataStreams) {
finalActions.add(
new AliasAction.RemoveDataStreamAlias(action.aliases()[0], dataStreamName, action.mustExist()));
}
break;
default:
throw new IllegalArgumentException("Unsupported action [" + action.actionType() + "]");
}
continue;
}

final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request.indicesOptions(), false,
action.indices());
for (Index concreteIndex : concreteIndices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,9 @@ public IndicesOptions indicesOptions() {
public ActionRequestValidationException validate() {
return null;
}

@Override
public boolean includeDataStreams() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,48 @@

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.DataStreamAlias;
import org.elasticsearch.cluster.metadata.DataStreamMetadata;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class GetAliasesResponse extends ActionResponse {

private final ImmutableOpenMap<String, List<AliasMetadata>> aliases;
private final Map<String, List<DataStreamAlias>> dataStreamAliases;

public GetAliasesResponse(ImmutableOpenMap<String, List<AliasMetadata>> aliases) {
public GetAliasesResponse(ImmutableOpenMap<String, List<AliasMetadata>> aliases, Map<String, List<DataStreamAlias>> dataStreamAliases) {
this.aliases = aliases;
this.dataStreamAliases = dataStreamAliases;
}

public GetAliasesResponse(StreamInput in) throws IOException {
super(in);
aliases = in.readImmutableMap(StreamInput::readString, i -> i.readList(AliasMetadata::new));
dataStreamAliases = in.getVersion().onOrAfter(DataStreamMetadata.DATA_STREAM_ALIAS_VERSION) ?
in.readMap(StreamInput::readString, in1 -> in1.readList(DataStreamAlias::new)) : Map.of();
}

public ImmutableOpenMap<String, List<AliasMetadata>> getAliases() {
return aliases;
}

public Map<String, List<DataStreamAlias>> getDataStreamAliases() {
return dataStreamAliases;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(aliases, StreamOutput::writeString, StreamOutput::writeList);
if (out.getVersion().onOrAfter(DataStreamMetadata.DATA_STREAM_ALIAS_VERSION)) {
out.writeMap(dataStreamAliases, StreamOutput::writeString, StreamOutput::writeList);
}
}

@Override
Expand All @@ -49,11 +63,12 @@ public boolean equals(Object o) {
return false;
}
GetAliasesResponse that = (GetAliasesResponse) o;
return Objects.equals(aliases, that.aliases);
return Objects.equals(aliases, that.aliases) &&
Objects.equals(dataStreamAliases, that.dataStreamAliases);
}

@Override
public int hashCode() {
return Objects.hash(aliases);
return Objects.hash(aliases, dataStreamAliases);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.DataStreamAlias;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.indices.SystemIndices.SystemIndexAccessLevel;
Expand All @@ -31,8 +34,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -68,7 +73,7 @@ protected void masterOperation(Task task, GetAliasesRequest request, ClusterStat
final SystemIndexAccessLevel systemIndexAccessLevel = indexNameExpressionResolver.getSystemIndexAccessLevel();
ImmutableOpenMap<String, List<AliasMetadata>> aliases = state.metadata().findAliases(request, concreteIndices);
listener.onResponse(new GetAliasesResponse(postProcess(request, concreteIndices, aliases, state,
systemIndexAccessLevel, threadPool.getThreadContext(), systemIndices)));
systemIndexAccessLevel, threadPool.getThreadContext(), systemIndices), postProcess(request, state)));
}

/**
Expand All @@ -81,6 +86,15 @@ static ImmutableOpenMap<String, List<AliasMetadata>> postProcess(GetAliasesReque
boolean noAliasesSpecified = request.getOriginalAliases() == null || request.getOriginalAliases().length == 0;
ImmutableOpenMap.Builder<String, List<AliasMetadata>> mapBuilder = ImmutableOpenMap.builder(aliases);
for (String index : concreteIndices) {
IndexAbstraction ia = state.metadata().getIndicesLookup().get(index);
assert ia.getType() == IndexAbstraction.Type.CONCRETE_INDEX;
if (ia.getParentDataStream() != null) {
// Don't include backing indices of data streams,
// because it is just noise. Aliases can't refer
// to backing indices directly.
continue;
}

if (aliases.get(index) == null && noAliasesSpecified) {
List<AliasMetadata> previous = mapBuilder.put(index, Collections.emptyList());
assert previous == null;
Expand All @@ -93,6 +107,21 @@ static ImmutableOpenMap<String, List<AliasMetadata>> postProcess(GetAliasesReque
return finalResponse;
}

Map<String, List<DataStreamAlias>> postProcess(GetAliasesRequest request, ClusterState state) {
Map<String, List<DataStreamAlias>> result = new HashMap<>();
boolean noAliasesSpecified = request.getOriginalAliases() == null || request.getOriginalAliases().length == 0;
List<String> requestedDataStreams =
indexNameExpressionResolver.dataStreamNames(state, request.indicesOptions(), request.indices());
for (String requestedDataStream : requestedDataStreams) {
List<DataStreamAlias> aliases = state.metadata().dataStreamAliases().values().stream()
.filter(alias -> alias.getDataStreams().contains(requestedDataStream))
.filter(alias -> noAliasesSpecified || Regex.simpleMatch(request.aliases(), alias.getName()))
.collect(Collectors.toList());
result.put(requestedDataStream, aliases);
}
return result;
}

private static void checkSystemIndexAccess(GetAliasesRequest request, SystemIndices systemIndices, ClusterState state,
ImmutableOpenMap<String, List<AliasMetadata>> aliasesMap,
SystemIndexAccessLevel systemIndexAccessLevel, ThreadContext threadContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,10 +558,9 @@ private static void enrichIndexAbstraction(String indexAbstraction, SortedMap<St
index.getParentDataStream() == null ? null : index.getParentDataStream().getName()));
break;
case ALIAS:
IndexAbstraction.Alias alias = (IndexAbstraction.Alias) ia;
String[] indexNames = alias.getIndices().stream().map(i -> i.getIndex().getName()).toArray(String[]::new);
String[] indexNames = ia.getIndices().stream().map(i -> i.getIndex().getName()).toArray(String[]::new);
Arrays.sort(indexNames);
aliases.add(new ResolvedAlias(alias.getName(), indexNames));
aliases.add(new ResolvedAlias(ia.getName(), indexNames));
break;
case DATA_STREAM:
IndexAbstraction.DataStream dataStream = (IndexAbstraction.DataStream) ia;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,60 @@ boolean apply(NewAliasValidator aliasValidator, Metadata.Builder metadata, Index
throw new UnsupportedOperationException();
}
}

public static class AddDataStreamAlias extends AliasAction {

private final String aliasName;
private final String dataStreamName;

public AddDataStreamAlias(String aliasName, String dataStreamName) {
super(dataStreamName);
this.aliasName = aliasName;
this.dataStreamName = dataStreamName;
}

public String getAliasName() {
return aliasName;
}

public String getDataStreamName() {
return dataStreamName;
}

@Override
boolean removeIndex() {
return false;
}

@Override
boolean apply(NewAliasValidator aliasValidator, Metadata.Builder metadata, IndexMetadata index) {
aliasValidator.validate(aliasName, null, null, null);
return metadata.put(aliasName, dataStreamName);
}
}

public static class RemoveDataStreamAlias extends AliasAction {

private final String aliasName;
private final Boolean mustExist;
private final String dataStreamName;

public RemoveDataStreamAlias(String aliasName, String dataStreamName, Boolean mustExist) {
super(dataStreamName);
this.aliasName = aliasName;
this.mustExist = mustExist;
this.dataStreamName = dataStreamName;
}

@Override
boolean removeIndex() {
return false;
}

@Override
boolean apply(NewAliasValidator aliasValidator, Metadata.Builder metadata, IndexMetadata index) {
boolean mustExist = this.mustExist != null ? this.mustExist : false;
return metadata.removeDataStreamAlias(aliasName, dataStreamName, mustExist);
}
}
}
Loading