Skip to content

Commit

Permalink
Add basic alias support for data streams
Browse files Browse the repository at this point in the history
Backporting elastic#72613 to 7.x.

Aliases to data streams can be defined via the existing update aliases api.
Aliases can either only refer to data streams or to indices (not both).
Also the existing get aliases api has been modified to support returning
aliases that refer to data streams.

Aliases for data streams are stored separately from data streams and
and refer to data streams by name and not to the backing indices of
a data stream. This means that when backing indices are added or removed
from a data stream that then the data stream alias doesn't need to be
updated.

The authorization model for aliases that refer to data streams is the
same as for aliases the refer to indices. In security privileges can
be defined on aliases, indices and data streams. When a privilege is
granted on an alias then access is also granted on the indices that
an alias refers to (irregardless whether privileges are granted or denied
on the actual indices). The same will apply for aliases that refer
to data streams. See for more details:
elastic#66163 (comment)

Relates to elastic#66163
  • Loading branch information
martijnvg committed May 11, 2021
1 parent 1966ec4 commit eac3aff
Show file tree
Hide file tree
Showing 33 changed files with 1,399 additions and 95 deletions.
98 changes: 98 additions & 0 deletions docs/reference/indices/aliases.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,101 @@ POST /_aliases
}
--------------------------------------------------
// TEST[s/^/PUT test\nPUT test2\n/]

[[aliases-data-streams]]
===== Data stream aliases

An alias can also point to one or more data streams. To add a data stream to an
alias, specify the stream's name in the `index` parameter. Wildcards (`*`) are
supported. If a wildcard pattern matches both data streams and indices, the
action only uses the matching data streams.

You can only specify data streams in the `add` and `remove` actions. Aliases
that point to data streams do not support the following parameters:

* `filter`
* `index_routing`
* `is_write_index`
* `routing`
* `search_routing`

For example, the following request adds two data streams to the `logs` alias.

////
[source,console]
----
PUT _data_stream/logs-my_app-default
PUT _data_stream/logs-nginx.access-prod
----
////

[source,console]
----
POST _aliases
{
"actions": [
{ "add": { "index": "logs-my_app-default", "alias": "logs" }},
{ "add": { "index": "logs-nginx.access-prod", "alias": "logs" }}
]
}
----
// TEST[continued]

To verify the alias points to both data streams, use the
<<indices-get-alias,get index alias API>>.

[source,console]
----
GET logs-*/_alias
----
// TEST[continued]

The API returns:

[source,console-result]
----
{
"logs-my_app-default": {
"aliases": {
"logs": {}
}
},
"logs-nginx.access-prod": {
"aliases": {
"logs": {}
}
}
}
----

Use the `remove` action to remove a data stream from an alias.

[source,console]
----
POST _aliases
{
"actions": [
{ "remove": { "index": "logs-my_app-default", "alias": "logs" }}
]
}
GET logs-*/_alias
----
// TEST[continued]

The get index alias API returns:

[source,console-result]
----
{
"logs-my_app-default": {
"aliases": {}
},
"logs-nginx.access-prod": {
"aliases": {
"logs": {}
}
}
}
----
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,7 @@ public void testAliasNameExistingIndex() throws Exception {

InvalidAliasNameException e = expectThrows(InvalidAliasNameException.class,
() -> createIndex("test"));
assertThat(e.getMessage(), equalTo("Invalid alias name [index], an index exists with the same name as the alias"));
assertThat(e.getMessage(), equalTo("Invalid alias name [index]: an index or data stream exists with the same name as the alias"));
}

public void testAliasEmptyName() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,27 @@ protected void masterOperation(final IndicesAliasesRequest request, final Cluste
// Resolve all the AliasActions into AliasAction instances and gather all the aliases
Set<String> aliases = new HashSet<>();
for (AliasActions action : actions) {
List<String> concreteDataStreams =
indexNameExpressionResolver.dataStreamNames(state, request.indicesOptions(), action.indices());
if (concreteDataStreams.size() != 0) {
switch (action.actionType()) {
case ADD:
for (String dataStreamName : concreteDataStreams) {
finalActions.add(new AliasAction.AddDataStreamAlias(action.aliases()[0], dataStreamName));
}
break;
case REMOVE:
for (String dataStreamName : concreteDataStreams) {
finalActions.add(
new AliasAction.RemoveDataStreamAlias(action.aliases()[0], dataStreamName, action.mustExist()));
}
break;
default:
throw new IllegalArgumentException("Unsupported action [" + action.actionType() + "]");
}
continue;
}

final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request.indicesOptions(), false,
action.indices());
for (Index concreteIndex : concreteIndices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,9 @@ public IndicesOptions indicesOptions() {
public ActionRequestValidationException validate() {
return null;
}

@Override
public boolean includeDataStreams() {
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,34 +10,48 @@

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.DataStreamAlias;
import org.elasticsearch.cluster.metadata.DataStreamMetadata;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class GetAliasesResponse extends ActionResponse {

private final ImmutableOpenMap<String, List<AliasMetadata>> aliases;
private final Map<String, List<DataStreamAlias>> dataStreamAliases;

public GetAliasesResponse(ImmutableOpenMap<String, List<AliasMetadata>> aliases) {
public GetAliasesResponse(ImmutableOpenMap<String, List<AliasMetadata>> aliases, Map<String, List<DataStreamAlias>> dataStreamAliases) {
this.aliases = aliases;
this.dataStreamAliases = dataStreamAliases;
}

public GetAliasesResponse(StreamInput in) throws IOException {
super(in);
aliases = in.readImmutableMap(StreamInput::readString, i -> i.readList(AliasMetadata::new));
dataStreamAliases = in.getVersion().onOrAfter(DataStreamMetadata.DATA_STREAM_ALIAS_VERSION) ?
in.readMap(StreamInput::readString, in1 -> in1.readList(DataStreamAlias::new)) : Map.of();
}

public ImmutableOpenMap<String, List<AliasMetadata>> getAliases() {
return aliases;
}

public Map<String, List<DataStreamAlias>> getDataStreamAliases() {
return dataStreamAliases;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeMap(aliases, StreamOutput::writeString, StreamOutput::writeList);
if (out.getVersion().onOrAfter(DataStreamMetadata.DATA_STREAM_ALIAS_VERSION)) {
out.writeMap(dataStreamAliases, StreamOutput::writeString, StreamOutput::writeList);
}
}

@Override
Expand All @@ -49,11 +63,12 @@ public boolean equals(Object o) {
return false;
}
GetAliasesResponse that = (GetAliasesResponse) o;
return Objects.equals(aliases, that.aliases);
return Objects.equals(aliases, that.aliases) &&
Objects.equals(dataStreamAliases, that.dataStreamAliases);
}

@Override
public int hashCode() {
return Objects.hash(aliases);
return Objects.hash(aliases, dataStreamAliases);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
import org.elasticsearch.cluster.block.ClusterBlockException;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.DataStreamAlias;
import org.elasticsearch.cluster.metadata.IndexAbstraction;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.regex.Regex;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.indices.SystemIndices.SystemIndexAccessLevel;
Expand All @@ -30,8 +33,10 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -67,7 +72,7 @@ protected void masterOperation(GetAliasesRequest request, ClusterState state, Ac
final SystemIndexAccessLevel systemIndexAccessLevel = indexNameExpressionResolver.getSystemIndexAccessLevel();
ImmutableOpenMap<String, List<AliasMetadata>> aliases = state.metadata().findAliases(request, concreteIndices);
listener.onResponse(new GetAliasesResponse(postProcess(request, concreteIndices, aliases, state,
systemIndexAccessLevel, threadPool.getThreadContext(), systemIndices)));
systemIndexAccessLevel, threadPool.getThreadContext(), systemIndices), postProcess(request, state)));
}

/**
Expand All @@ -80,6 +85,15 @@ static ImmutableOpenMap<String, List<AliasMetadata>> postProcess(GetAliasesReque
boolean noAliasesSpecified = request.getOriginalAliases() == null || request.getOriginalAliases().length == 0;
ImmutableOpenMap.Builder<String, List<AliasMetadata>> mapBuilder = ImmutableOpenMap.builder(aliases);
for (String index : concreteIndices) {
IndexAbstraction ia = state.metadata().getIndicesLookup().get(index);
assert ia.getType() == IndexAbstraction.Type.CONCRETE_INDEX;
if (ia.getParentDataStream() != null) {
// Don't include backing indices of data streams,
// because it is just noise. Aliases can't refer
// to backing indices directly.
continue;
}

if (aliases.get(index) == null && noAliasesSpecified) {
List<AliasMetadata> previous = mapBuilder.put(index, Collections.emptyList());
assert previous == null;
Expand All @@ -92,6 +106,21 @@ static ImmutableOpenMap<String, List<AliasMetadata>> postProcess(GetAliasesReque
return finalResponse;
}

Map<String, List<DataStreamAlias>> postProcess(GetAliasesRequest request, ClusterState state) {
Map<String, List<DataStreamAlias>> result = new HashMap<>();
boolean noAliasesSpecified = request.getOriginalAliases() == null || request.getOriginalAliases().length == 0;
List<String> requestedDataStreams =
indexNameExpressionResolver.dataStreamNames(state, request.indicesOptions(), request.indices());
for (String requestedDataStream : requestedDataStreams) {
List<DataStreamAlias> aliases = state.metadata().dataStreamAliases().values().stream()
.filter(alias -> alias.getDataStreams().contains(requestedDataStream))
.filter(alias -> noAliasesSpecified || Regex.simpleMatch(request.aliases(), alias.getName()))
.collect(Collectors.toList());
result.put(requestedDataStream, aliases);
}
return result;
}

private static void checkSystemIndexAccess(GetAliasesRequest request, SystemIndices systemIndices, ClusterState state,
ImmutableOpenMap<String, List<AliasMetadata>> aliasesMap,
SystemIndexAccessLevel systemIndexAccessLevel, ThreadContext threadContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,9 @@ private static void enrichIndexAbstraction(String indexAbstraction, SortedMap<St
index.getParentDataStream() == null ? null : index.getParentDataStream().getName()));
break;
case ALIAS:
IndexAbstraction.Alias alias = (IndexAbstraction.Alias) ia;
String[] indexNames = alias.getIndices().stream().map(i -> i.getIndex().getName()).toArray(String[]::new);
String[] indexNames = ia.getIndices().stream().map(i -> i.getIndex().getName()).toArray(String[]::new);
Arrays.sort(indexNames);
aliases.add(new ResolvedAlias(alias.getName(), indexNames));
aliases.add(new ResolvedAlias(ia.getName(), indexNames));
break;
case DATA_STREAM:
IndexAbstraction.DataStream dataStream = (IndexAbstraction.DataStream) ia;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,4 +198,60 @@ boolean apply(NewAliasValidator aliasValidator, Metadata.Builder metadata, Index
throw new UnsupportedOperationException();
}
}

public static class AddDataStreamAlias extends AliasAction {

private final String aliasName;
private final String dataStreamName;

public AddDataStreamAlias(String aliasName, String dataStreamName) {
super(dataStreamName);
this.aliasName = aliasName;
this.dataStreamName = dataStreamName;
}

public String getAliasName() {
return aliasName;
}

public String getDataStreamName() {
return dataStreamName;
}

@Override
boolean removeIndex() {
return false;
}

@Override
boolean apply(NewAliasValidator aliasValidator, Metadata.Builder metadata, IndexMetadata index) {
aliasValidator.validate(aliasName, null, null, null);
return metadata.put(aliasName, dataStreamName);
}
}

public static class RemoveDataStreamAlias extends AliasAction {

private final String aliasName;
private final Boolean mustExist;
private final String dataStreamName;

public RemoveDataStreamAlias(String aliasName, String dataStreamName, Boolean mustExist) {
super(dataStreamName);
this.aliasName = aliasName;
this.mustExist = mustExist;
this.dataStreamName = dataStreamName;
}

@Override
boolean removeIndex() {
return false;
}

@Override
boolean apply(NewAliasValidator aliasValidator, Metadata.Builder metadata, IndexMetadata index) {
boolean mustExist = this.mustExist != null ? this.mustExist : false;
return metadata.removeDataStreamAlias(aliasName, dataStreamName, mustExist);
}
}
}
Loading

0 comments on commit eac3aff

Please sign in to comment.