Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add infrastructure for managing system indices #65604

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
*/
package org.elasticsearch.action.admin.indices.create;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ActiveShardsObserver;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -40,18 +43,24 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

/**
* Api that auto creates an index or data stream that originate from requests that write into an index that doesn't yet exist.
*/
public final class AutoCreateAction extends ActionType<CreateIndexResponse> {

private static final Logger logger = LogManager.getLogger(AutoCreateAction.class);

public static final AutoCreateAction INSTANCE = new AutoCreateAction();
public static final String NAME = "indices:admin/auto_create";

Expand All @@ -65,15 +74,17 @@ public static final class TransportAction extends TransportMasterNodeAction<Crea
private final MetadataCreateIndexService createIndexService;
private final MetadataCreateDataStreamService metadataCreateDataStreamService;
private final AutoCreateIndex autoCreateIndex;
private final SystemIndices systemIndices;

@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetadataCreateIndexService createIndexService,
MetadataCreateDataStreamService metadataCreateDataStreamService,
AutoCreateIndex autoCreateIndex) {
AutoCreateIndex autoCreateIndex, SystemIndices systemIndices) {
super(NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest::new, indexNameExpressionResolver,
CreateIndexResponse::new, ThreadPool.Names.SAME);
this.systemIndices = systemIndices;
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
this.createIndexService = createIndexService;
this.metadataCreateDataStreamService = metadataCreateDataStreamService;
Expand Down Expand Up @@ -141,12 +152,51 @@ public ClusterState execute(ClusterState currentState) throws Exception {
return currentState;
}

CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(request.cause(), indexName, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout());
final SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(indexName);
CreateIndexClusterStateUpdateRequest updateRequest = descriptor != null && descriptor.isAutomaticallyManaged()
? buildSystemIndexUpdateRequest(descriptor)
: buildUpdateRequest(indexName);

return createIndexService.applyCreateIndexRequest(currentState, updateRequest, false);
}
}

private CreateIndexClusterStateUpdateRequest buildUpdateRequest(String indexName) {
CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(request.cause(), indexName, request.index())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout());
logger.debug("Auto-creating index {}", indexName);
return updateRequest;
}

private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest(SystemIndexDescriptor descriptor) {
String mappings = descriptor.getMappings();
Settings settings = descriptor.getSettings();
String aliasName = descriptor.getAliasName();
String concreteIndexName = descriptor.getPrimaryIndex();

CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(request.cause(), concreteIndexName, request.index())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout());

updateRequest.waitForActiveShards(ActiveShardCount.ALL);

if (mappings != null) {
updateRequest.mappings(mappings);
}
if (settings != null) {
updateRequest.settings(settings);
}
if (aliasName != null) {
updateRequest.aliases(Set.of(new Alias(aliasName)));
}

logger.debug("Auto-creating system index {}", concreteIndexName);

return updateRequest;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package org.elasticsearch.action.admin.indices.create;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -29,24 +31,33 @@
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.Objects;
import java.util.Set;

/**
* Create index action.
*/
public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {

private final MetadataCreateIndexService createIndexService;
private final SystemIndices systemIndices;

@Inject
public TransportCreateIndexAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetadataCreateIndexService createIndexService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
SystemIndices systemIndices) {
super(CreateIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest::new,
indexNameExpressionResolver, CreateIndexResponse::new, ThreadPool.Names.SAME);
this.createIndexService = createIndexService;
this.systemIndices = systemIndices;
}

@Override
Expand All @@ -58,20 +69,55 @@ protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterSt
protected void masterOperation(Task task, final CreateIndexRequest request, final ClusterState state,
final ActionListener<CreateIndexResponse> listener) {
String cause = request.cause();
if (cause.length() == 0) {
if (cause.isEmpty()) {
cause = "api";
}

final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
final CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases())
.waitForActiveShards(request.waitForActiveShards());

final SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(indexName);
final CreateIndexClusterStateUpdateRequest updateRequest = descriptor != null && descriptor.isAutomaticallyManaged()
? buildSystemIndexUpdateRequest(request, cause, descriptor)
: buildUpdateRequest(request, cause, indexName);

createIndexService.createIndex(updateRequest, listener.map(response ->
new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));
}

private CreateIndexClusterStateUpdateRequest buildUpdateRequest(CreateIndexRequest request, String cause, String indexName) {
return new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index()).ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings())
.mappings(request.mappings())
.aliases(request.aliases())
.waitForActiveShards(request.waitForActiveShards());
}

private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest(
CreateIndexRequest request,
String cause,
SystemIndexDescriptor descriptor
) {
final Settings settings = Objects.requireNonNullElse(descriptor.getSettings(), Settings.EMPTY);

final Set<Alias> aliases;
if (descriptor.getAliasName() == null) {
aliases = Set.of();
} else {
aliases = Set.of(new Alias(descriptor.getAliasName()));
}

final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
cause,
descriptor.getPrimaryIndex(),
request.index()
);

return updateRequest.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.aliases(aliases)
.waitForActiveShards(ActiveShardCount.ALL)
.mappings(descriptor.getMappings())
.settings(settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand All @@ -55,6 +57,7 @@ public class TransportPutMappingAction extends AcknowledgedTransportMasterNodeAc

private final MetadataMappingService metadataMappingService;
private final RequestValidators<PutMappingRequest> requestValidators;
private final SystemIndices systemIndices;

@Inject
public TransportPutMappingAction(
Expand All @@ -64,11 +67,13 @@ public TransportPutMappingAction(
final MetadataMappingService metadataMappingService,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final RequestValidators<PutMappingRequest> requestValidators) {
final RequestValidators<PutMappingRequest> requestValidators,
final SystemIndices systemIndices) {
super(PutMappingAction.NAME, transportService, clusterService, threadPool, actionFilters, PutMappingRequest::new,
indexNameExpressionResolver, ThreadPool.Names.SAME);
this.metadataMappingService = metadataMappingService;
this.requestValidators = Objects.requireNonNull(requestValidators);
this.systemIndices = systemIndices;
}

@Override
Expand All @@ -87,12 +92,25 @@ protected void masterOperation(Task task, final PutMappingRequest request, final
final ActionListener<AcknowledgedResponse> listener) {
try {
final Index[] concreteIndices = resolveIndices(state, request, indexNameExpressionResolver);
final String mappingSource = request.source();

final Optional<Exception> maybeValidationException = requestValidators.validateRequest(request, state, concreteIndices);
if (maybeValidationException.isPresent()) {
listener.onFailure(maybeValidationException.get());
return;
}

final List<String> violations = checkForSystemIndexViolations(concreteIndices, mappingSource);
if (violations.isEmpty() == false) {
final String message = "Cannot update mappings in "
+ violations
+ ": system indices can only use mappings from their descriptors, "
+ "but the mappings in the request did not match those in the descriptors(s)";
logger.warn(message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On backport, we will need to make this a deprecation warning

listener.onFailure(new IllegalArgumentException(message));
return;
}

performMappingUpdate(concreteIndices, request, listener, metadataMappingService);
} catch (IndexNotFoundException ex) {
logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}]",
Expand Down Expand Up @@ -142,4 +160,21 @@ public void onFailure(Exception t) {
});
}

private List<String> checkForSystemIndexViolations(Index[] concreteIndices, String requestMappings) {
List<String> violations = new ArrayList<>();

for (Index index : concreteIndices) {
final SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(index.getName());
if (descriptor != null && descriptor.isAutomaticallyManaged()) {
final String descriptorMappings = descriptor.getMappings();

// Technically we could trip over a difference in whitespace here, but then again nobody should be trying to manually
// update a descriptor's mappings.
if (descriptorMappings.equals(requestMappings) == false) {
violations.add(index.getName());
}
}
}
return violations;
}
}
Loading