Skip to content

Commit

Permalink
Add Fleet action results system data stream (#71966)
Browse files Browse the repository at this point in the history
This commit adds support for system data streams and also the first use
of a system data stream with the fleet action results data stream. A
system data stream is one that is used to store system data that users
should not interact with directly. Elasticsearch will manage these data
streams. REST API access is available for external system data streams
so that other stack components can store system data within a system
data stream. System data streams will not use the system index read and
write threadpools.

Backport of #71667
  • Loading branch information
jaymode authored Apr 20, 2021
1 parent 23d9b9b commit 5ab3092
Show file tree
Hide file tree
Showing 50 changed files with 1,752 additions and 290 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public final class DataStream {
private final List<String> indices;
private final long generation;
private final boolean hidden;
private final boolean system;
ClusterHealthStatus dataStreamStatus;
@Nullable
String indexTemplate;
Expand All @@ -36,7 +37,7 @@ public final class DataStream {

public DataStream(String name, String timeStampField, List<String> indices, long generation, ClusterHealthStatus dataStreamStatus,
@Nullable String indexTemplate, @Nullable String ilmPolicyName, @Nullable Map<String, Object> metadata,
boolean hidden) {
boolean hidden, boolean system) {
this.name = name;
this.timeStampField = timeStampField;
this.indices = indices;
Expand All @@ -46,6 +47,7 @@ public DataStream(String name, String timeStampField, List<String> indices, long
this.ilmPolicyName = ilmPolicyName;
this.metadata = metadata;
this.hidden = hidden;
this.system = system;
}

public String getName() {
Expand Down Expand Up @@ -84,6 +86,10 @@ public boolean isHidden() {
return hidden;
}

public boolean isSystem() {
return system;
}

public static final ParseField NAME_FIELD = new ParseField("name");
public static final ParseField TIMESTAMP_FIELD_FIELD = new ParseField("timestamp_field");
public static final ParseField INDICES_FIELD = new ParseField("indices");
Expand All @@ -93,6 +99,7 @@ public boolean isHidden() {
public static final ParseField ILM_POLICY_FIELD = new ParseField("ilm_policy");
public static final ParseField METADATA_FIELD = new ParseField("_meta");
public static final ParseField HIDDEN_FIELD = new ParseField("hidden");
public static final ParseField SYSTEM_FIELD = new ParseField("system");

@SuppressWarnings("unchecked")
private static final ConstructingObjectParser<DataStream, Void> PARSER = new ConstructingObjectParser<>("data_stream",
Expand All @@ -107,9 +114,10 @@ public boolean isHidden() {
String indexTemplate = (String) args[5];
String ilmPolicy = (String) args[6];
Map<String, Object> metadata = (Map<String, Object>) args[7];
Boolean hidden = (Boolean) args[8];
hidden = hidden != null && hidden;
return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy, metadata, hidden);
boolean hidden = args[8] != null && (boolean) args[8];
boolean system = args[9] != null && (boolean) args[9];
return new DataStream(dataStreamName, timeStampField, indices, generation, status, indexTemplate, ilmPolicy, metadata, hidden,
system);
});

static {
Expand All @@ -122,6 +130,7 @@ public boolean isHidden() {
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), ILM_POLICY_FIELD);
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> p.map(), METADATA_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), HIDDEN_FIELD);
PARSER.declareBoolean(ConstructingObjectParser.optionalConstructorArg(), SYSTEM_FIELD);
}

public static DataStream fromXContent(XContentParser parser) throws IOException {
Expand All @@ -138,13 +147,16 @@ public boolean equals(Object o) {
timeStampField.equals(that.timeStampField) &&
indices.equals(that.indices) &&
dataStreamStatus == that.dataStreamStatus &&
hidden == that.hidden &&
system == that.system &&
Objects.equals(indexTemplate, that.indexTemplate) &&
Objects.equals(ilmPolicyName, that.ilmPolicyName) &&
Objects.equals(metadata, that.metadata);
}

@Override
public int hashCode() {
return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName, metadata);
return Objects.hash(name, timeStampField, indices, generation, dataStreamStatus, indexTemplate, ilmPolicyName, metadata, hidden,
system);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,8 @@ stream's oldest backing index.
"generation": 2,
"status": "GREEN",
"template": "my-data-stream-template",
"hidden": false
"hidden": false,
"system": false
}
]
}
Expand Down
11 changes: 9 additions & 2 deletions docs/reference/indices/get-data-stream.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,11 @@ use the <<indices-get-settings,get index settings API>>.
`hidden`::
(Boolean)
If `true`, the data stream is <<hidden-indices,hidden>>.
`system`::
(Boolean)
If `true`, the data stream is created and managed by an Elastic stack component
and cannot be modified through normal user interaction.
====

[[get-data-stream-api-example]]
Expand Down Expand Up @@ -241,7 +246,8 @@ The API returns the following response:
"status": "GREEN",
"template": "my-index-template",
"ilm_policy": "my-lifecycle-policy",
"hidden": false
"hidden": false,
"system": false
},
{
"name": "my-data-stream-two",
Expand All @@ -261,7 +267,8 @@ The API returns the following response:
"status": "YELLOW",
"template": "my-index-template",
"ilm_policy": "my-lifecycle-policy",
"hidden": false
"hidden": false,
"system": false
}
]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver.SystemIndexAccessLevel;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.indices.SystemIndices.SystemIndexAccessLevel;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -65,11 +65,9 @@ protected void masterOperation(GetAliasesRequest request, ClusterState state, Ac
concreteIndices = indexNameExpressionResolver.concreteIndexNames(state, request);
}
final SystemIndexAccessLevel systemIndexAccessLevel = indexNameExpressionResolver.getSystemIndexAccessLevel();
final String elasticProduct =
threadPool.getThreadContext().getHeader(IndexNameExpressionResolver.EXTERNAL_SYSTEM_INDEX_ACCESS_CONTROL_HEADER_KEY);
ImmutableOpenMap<String, List<AliasMetadata>> aliases = state.metadata().findAliases(request, concreteIndices);
listener.onResponse(new GetAliasesResponse(postProcess(request, concreteIndices, aliases, state,
systemIndexAccessLevel, elasticProduct, systemIndices)));
systemIndexAccessLevel, threadPool.getThreadContext(), systemIndices)));
}

/**
Expand All @@ -78,7 +76,7 @@ protected void masterOperation(GetAliasesRequest request, ClusterState state, Ac
static ImmutableOpenMap<String, List<AliasMetadata>> postProcess(GetAliasesRequest request, String[] concreteIndices,
ImmutableOpenMap<String, List<AliasMetadata>> aliases,
ClusterState state, SystemIndexAccessLevel systemIndexAccessLevel,
String elasticProduct, SystemIndices systemIndices) {
ThreadContext threadContext, SystemIndices systemIndices) {
boolean noAliasesSpecified = request.getOriginalAliases() == null || request.getOriginalAliases().length == 0;
ImmutableOpenMap.Builder<String, List<AliasMetadata>> mapBuilder = ImmutableOpenMap.builder(aliases);
for (String index : concreteIndices) {
Expand All @@ -89,19 +87,19 @@ static ImmutableOpenMap<String, List<AliasMetadata>> postProcess(GetAliasesReque
}
final ImmutableOpenMap<String, List<AliasMetadata>> finalResponse = mapBuilder.build();
if (systemIndexAccessLevel != SystemIndexAccessLevel.ALL) {
checkSystemIndexAccess(request, systemIndices, state, finalResponse, systemIndexAccessLevel, elasticProduct);
checkSystemIndexAccess(request, systemIndices, state, finalResponse, systemIndexAccessLevel, threadContext);
}
return finalResponse;
}

private static void checkSystemIndexAccess(GetAliasesRequest request, SystemIndices systemIndices, ClusterState state,
ImmutableOpenMap<String, List<AliasMetadata>> aliasesMap,
SystemIndexAccessLevel systemIndexAccessLevel, String elasticProduct) {
SystemIndexAccessLevel systemIndexAccessLevel, ThreadContext threadContext) {
final Predicate<IndexMetadata> systemIndexAccessAllowPredicate;
if (systemIndexAccessLevel == SystemIndexAccessLevel.NONE) {
systemIndexAccessAllowPredicate = indexMetadata -> false;
} else if (systemIndexAccessLevel == SystemIndexAccessLevel.RESTRICTED) {
systemIndexAccessAllowPredicate = systemIndices.getProductSystemIndexMetadataPredicate(elasticProduct);
systemIndexAccessAllowPredicate = systemIndices.getProductSystemIndexMetadataPredicate(threadContext);
} else {
throw new IllegalArgumentException("Unexpected system index access level: " + systemIndexAccessLevel);
}
Expand All @@ -121,23 +119,23 @@ private static void checkSystemIndexAccess(GetAliasesRequest request, SystemIndi
"this request accesses system indices: {}, but in a future major version, direct access to system " +
"indices will be prevented by default", systemIndicesNames);
} else {
checkSystemAliasAccess(request, systemIndices, systemIndexAccessLevel, elasticProduct);
checkSystemAliasAccess(request, systemIndices, systemIndexAccessLevel, threadContext);
}
}

private static void checkSystemAliasAccess(GetAliasesRequest request, SystemIndices systemIndices,
SystemIndexAccessLevel systemIndexAccessLevel, String elasticProduct) {
SystemIndexAccessLevel systemIndexAccessLevel, ThreadContext threadContext) {
final Predicate<String> systemIndexAccessAllowPredicate;
if (systemIndexAccessLevel == SystemIndexAccessLevel.NONE) {
systemIndexAccessAllowPredicate = name -> true;
} else if (systemIndexAccessLevel == SystemIndexAccessLevel.RESTRICTED) {
systemIndexAccessAllowPredicate = systemIndices.getProductSystemIndexNamePredicate(elasticProduct).negate();
systemIndexAccessAllowPredicate = systemIndices.getProductSystemIndexNamePredicate(threadContext).negate();
} else {
throw new IllegalArgumentException("Unexpected system index access level: " + systemIndexAccessLevel);
}

final List<String> systemAliases = Arrays.stream(request.aliases())
.filter(systemIndices::isSystemIndex)
.filter(systemIndices::isSystemName)
.filter(systemIndexAccessAllowPredicate)
.collect(Collectors.toList());
if (systemAliases.isEmpty() == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemDataStreamDescriptor;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -109,18 +110,25 @@ protected void masterOperation(CreateIndexRequest request,

@Override
public ClusterState execute(ClusterState currentState) throws Exception {
final SystemDataStreamDescriptor dataStreamDescriptor =
systemIndices.validateDataStreamAccess(request.index(), threadPool.getThreadContext());
final boolean isSystemDataStream = dataStreamDescriptor != null;
final boolean isSystemIndex = isSystemDataStream == false && systemIndices.isSystemIndex(request.index());
final ComposableIndexTemplate template = resolveTemplate(request, currentState.metadata());
final boolean isDataStream = isSystemIndex == false &&
(isSystemDataStream || (template != null && template.getDataStreamTemplate() != null));

if (template != null && template.getDataStreamTemplate() != null) {
if (isDataStream) {
// This expression only evaluates to true when the argument is non-null and false
if (Boolean.FALSE.equals(template.getAllowAutoCreate())) {
if (isSystemDataStream == false && Boolean.FALSE.equals(template.getAllowAutoCreate())) {
throw new IndexNotFoundException(
"composable template " + template.indexPatterns() + " forbids index auto creation"
);
}

CreateDataStreamClusterStateUpdateRequest createRequest = new CreateDataStreamClusterStateUpdateRequest(
request.index(),
dataStreamDescriptor,
request.masterNodeTimeout(),
request.timeout()
);
Expand All @@ -130,21 +138,27 @@ public ClusterState execute(ClusterState currentState) throws Exception {
} else {
String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
indexNameRef.set(indexName);
if (isSystemIndex) {
if (indexName.equals(request.index()) == false) {
throw new IllegalStateException("system indices do not support date math expressions");
}
} else {
// This will throw an exception if the index does not exist and creating it is prohibited
final boolean shouldAutoCreate = autoCreateIndex.shouldAutoCreate(indexName, currentState);

// This will throw an exception if the index does not exist and creating it is prohibited
final boolean shouldAutoCreate = autoCreateIndex.shouldAutoCreate(indexName, currentState);

if (shouldAutoCreate == false) {
// The index already exists.
return currentState;
if (shouldAutoCreate == false) {
// The index already exists.
return currentState;
}
}

final SystemIndexDescriptor mainDescriptor = systemIndices.findMatchingDescriptor(indexName);
final boolean isSystemIndex = mainDescriptor != null && mainDescriptor.isAutomaticallyManaged();
final SystemIndexDescriptor mainDescriptor =
isSystemIndex ? systemIndices.findMatchingDescriptor(indexName) : null;
final boolean isManagedSystemIndex = mainDescriptor != null && mainDescriptor.isAutomaticallyManaged();

final CreateIndexClusterStateUpdateRequest updateRequest;

if (isSystemIndex) {
if (isManagedSystemIndex) {
final SystemIndexDescriptor descriptor =
mainDescriptor.getDescriptorCompatibleWith(state.nodes().getSmallestNonClientNodeVersion());
if (descriptor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.indices.SystemDataStreamDescriptor;

import java.util.HashMap;
import java.util.HashSet;
Expand All @@ -35,6 +36,7 @@ public class CreateIndexClusterStateUpdateRequest extends ClusterStateUpdateRequ
private Index recoverFrom;
private ResizeType resizeType;
private boolean copySettings;
private SystemDataStreamDescriptor systemDataStreamDescriptor;

private Settings settings = Settings.Builder.EMPTY_SETTINGS;

Expand Down Expand Up @@ -95,6 +97,11 @@ public CreateIndexClusterStateUpdateRequest nameResolvedInstant(long nameResolve
return this;
}

public CreateIndexClusterStateUpdateRequest systemDataStreamDescriptor(SystemDataStreamDescriptor systemDataStreamDescriptor) {
this.systemDataStreamDescriptor = systemDataStreamDescriptor;
return this;
}

public String cause() {
return cause;
}
Expand Down Expand Up @@ -123,6 +130,10 @@ public Index recoverFrom() {
return recoverFrom;
}

public SystemDataStreamDescriptor systemDataStreamDescriptor() {
return systemDataStreamDescriptor;
}

/**
* The name that was provided by the user. This might contain a date math expression.
* @see IndexMetadata#SETTING_INDEX_PROVIDED_NAME
Expand Down Expand Up @@ -178,6 +189,7 @@ public String toString() {
", aliases=" + aliases +
", blocks=" + blocks +
", waitForActiveShards=" + waitForActiveShards +
", systemDataStreamDescriptor=" + systemDataStreamDescriptor +
'}';
}
}
Loading

0 comments on commit 5ab3092

Please sign in to comment.