Skip to content

Commit

Permalink
Add infrastructure for managing system indices (#65970)
Browse files Browse the repository at this point in the history
Backport of #65604.

Part of #61656.

Add the necessary support for automatically creating and updating system
indices. This works by making it possible to create a system index
descriptor with all the information needed to manage the mappings,
settings and aliases.

Follow-up work will opt existing indices into this framework.
  • Loading branch information
pugnascotia authored Dec 9, 2020
1 parent f15ce0e commit 3c9cf2b
Show file tree
Hide file tree
Showing 16 changed files with 1,343 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@
*/
package org.elasticsearch.action.admin.indices.create;

import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.ActiveShardsObserver;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.AutoCreateIndex;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterState;
Expand All @@ -40,17 +46,21 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import java.util.concurrent.atomic.AtomicReference;

/**
* Api that auto creates an index or data stream that originate from requests that write into an index that doesn't yet exist.
*/
public final class AutoCreateAction extends ActionType<CreateIndexResponse> {

private static final Logger logger = LogManager.getLogger(AutoCreateAction.class);

public static final AutoCreateAction INSTANCE = new AutoCreateAction();
public static final String NAME = "indices:admin/auto_create";

Expand All @@ -64,15 +74,17 @@ public static final class TransportAction extends TransportMasterNodeAction<Crea
private final MetadataCreateIndexService createIndexService;
private final MetadataCreateDataStreamService metadataCreateDataStreamService;
private final AutoCreateIndex autoCreateIndex;
private final SystemIndices systemIndices;

@Inject
public TransportAction(TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
MetadataCreateIndexService createIndexService,
MetadataCreateDataStreamService metadataCreateDataStreamService,
AutoCreateIndex autoCreateIndex) {
AutoCreateIndex autoCreateIndex, SystemIndices systemIndices) {
super(NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest::new, indexNameExpressionResolver,
CreateIndexResponse::new, ThreadPool.Names.SAME);
this.systemIndices = systemIndices;
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
this.createIndexService = createIndexService;
this.metadataCreateDataStreamService = metadataCreateDataStreamService;
Expand Down Expand Up @@ -139,12 +151,51 @@ public ClusterState execute(ClusterState currentState) throws Exception {
return currentState;
}

CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(request.cause(), indexName, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout());
final SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(indexName);
CreateIndexClusterStateUpdateRequest updateRequest = descriptor != null && descriptor.isAutomaticallyManaged()
? buildSystemIndexUpdateRequest(descriptor)
: buildUpdateRequest(indexName);

return createIndexService.applyCreateIndexRequest(currentState, updateRequest, false);
}
}

private CreateIndexClusterStateUpdateRequest buildUpdateRequest(String indexName) {
CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(request.cause(), indexName, request.index())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout());
logger.debug("Auto-creating index {}", indexName);
return updateRequest;
}

private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest(SystemIndexDescriptor descriptor) {
String mappings = descriptor.getMappings();
Settings settings = descriptor.getSettings();
String aliasName = descriptor.getAliasName();
String concreteIndexName = descriptor.getPrimaryIndex();

CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(request.cause(), concreteIndexName, request.index())
.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout());

updateRequest.waitForActiveShards(ActiveShardCount.ALL);

if (mappings != null) {
updateRequest.mappings(Collections.singletonMap(MapperService.SINGLE_MAPPING_NAME, mappings));
}
if (settings != null) {
updateRequest.settings(settings);
}
if (aliasName != null) {
updateRequest.aliases(Collections.singleton(new Alias(aliasName)));
}

logger.debug("Auto-creating system index {}", concreteIndexName);

return updateRequest;
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@

package org.elasticsearch.action.admin.indices.create;

import java.util.Collections;
import java.util.Set;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.block.ClusterBlockException;
Expand All @@ -29,6 +34,10 @@
import org.elasticsearch.cluster.metadata.MetadataCreateIndexService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -38,14 +47,17 @@
public class TransportCreateIndexAction extends TransportMasterNodeAction<CreateIndexRequest, CreateIndexResponse> {

private final MetadataCreateIndexService createIndexService;
private final SystemIndices systemIndices;

@Inject
public TransportCreateIndexAction(TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, MetadataCreateIndexService createIndexService,
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
SystemIndices systemIndices) {
super(CreateIndexAction.NAME, transportService, clusterService, threadPool, actionFilters, CreateIndexRequest::new,
indexNameExpressionResolver, CreateIndexResponse::new, ThreadPool.Names.SAME);
this.createIndexService = createIndexService;
this.systemIndices = systemIndices;
}

@Override
Expand All @@ -57,20 +69,58 @@ protected ClusterBlockException checkBlock(CreateIndexRequest request, ClusterSt
protected void masterOperation(final CreateIndexRequest request, final ClusterState state,
final ActionListener<CreateIndexResponse> listener) {
String cause = request.cause();
if (cause.length() == 0) {
if (cause.isEmpty()) {
cause = "api";
}

final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
final CreateIndexClusterStateUpdateRequest updateRequest =
new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index())
.ackTimeout(request.timeout()).masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings()).mappings(request.mappings())
.aliases(request.aliases())
.waitForActiveShards(request.waitForActiveShards());

final SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(indexName);
final CreateIndexClusterStateUpdateRequest updateRequest = descriptor != null && descriptor.isAutomaticallyManaged()
? buildSystemIndexUpdateRequest(request, cause, descriptor)
: buildUpdateRequest(request, cause, indexName);

createIndexService.createIndex(updateRequest, listener.map(response ->
new CreateIndexResponse(response.isAcknowledged(), response.isShardsAcknowledged(), indexName)));
}

private CreateIndexClusterStateUpdateRequest buildUpdateRequest(CreateIndexRequest request, String cause, String indexName) {
return new CreateIndexClusterStateUpdateRequest(cause, indexName, request.index()).ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.settings(request.settings())
.mappings(request.mappings())
.aliases(request.aliases())
.waitForActiveShards(request.waitForActiveShards());
}

private CreateIndexClusterStateUpdateRequest buildSystemIndexUpdateRequest(
CreateIndexRequest request,
String cause,
SystemIndexDescriptor descriptor
) {
Settings settings = descriptor.getSettings();
if (settings == null) {
settings = Settings.EMPTY;
}

final Set<Alias> aliases;
if (descriptor.getAliasName() == null) {
aliases = Collections.emptySet();
} else {
aliases = Collections.singleton(new Alias(descriptor.getAliasName()));
}

final CreateIndexClusterStateUpdateRequest updateRequest = new CreateIndexClusterStateUpdateRequest(
cause,
descriptor.getPrimaryIndex(),
request.index()
);

return updateRequest.ackTimeout(request.timeout())
.masterNodeTimeout(request.masterNodeTimeout())
.aliases(aliases)
.waitForActiveShards(ActiveShardCount.ALL)
.mappings(Collections.singletonMap(MapperService.SINGLE_MAPPING_NAME, descriptor.getMappings()))
.settings(settings);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.indices.SystemIndexDescriptor;
import org.elasticsearch.indices.SystemIndices;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand All @@ -54,6 +56,7 @@ public class TransportPutMappingAction extends AcknowledgedTransportMasterNodeAc

private final MetadataMappingService metadataMappingService;
private final RequestValidators<PutMappingRequest> requestValidators;
private final SystemIndices systemIndices;

@Inject
public TransportPutMappingAction(
Expand All @@ -63,11 +66,13 @@ public TransportPutMappingAction(
final MetadataMappingService metadataMappingService,
final ActionFilters actionFilters,
final IndexNameExpressionResolver indexNameExpressionResolver,
final RequestValidators<PutMappingRequest> requestValidators) {
final RequestValidators<PutMappingRequest> requestValidators,
final SystemIndices systemIndices) {
super(PutMappingAction.NAME, transportService, clusterService, threadPool, actionFilters, PutMappingRequest::new,
indexNameExpressionResolver, ThreadPool.Names.SAME);
this.metadataMappingService = metadataMappingService;
this.requestValidators = Objects.requireNonNull(requestValidators);
this.systemIndices = systemIndices;
}

@Override
Expand All @@ -86,12 +91,25 @@ protected void masterOperation(final PutMappingRequest request, final ClusterSta
final ActionListener<AcknowledgedResponse> listener) {
try {
final Index[] concreteIndices = resolveIndices(state, request, indexNameExpressionResolver);
final String mappingSource = request.source();

final Optional<Exception> maybeValidationException = requestValidators.validateRequest(request, state, concreteIndices);
if (maybeValidationException.isPresent()) {
listener.onFailure(maybeValidationException.get());
return;
}

final List<String> violations = checkForSystemIndexViolations(concreteIndices, mappingSource);
if (violations.isEmpty() == false) {
final String message = "Cannot update mappings in "
+ violations
+ ": system indices can only use mappings from their descriptors, "
+ "but the mappings in the request did not match those in the descriptors(s)";
logger.warn(message);
listener.onFailure(new IllegalArgumentException(message));
return;
}

performMappingUpdate(concreteIndices, request, listener, metadataMappingService);
} catch (IndexNotFoundException ex) {
logger.debug(() -> new ParameterizedMessage("failed to put mappings on indices [{}], type [{}]",
Expand Down Expand Up @@ -142,4 +160,21 @@ public void onFailure(Exception t) {
});
}

private List<String> checkForSystemIndexViolations(Index[] concreteIndices, String requestMappings) {
List<String> violations = new ArrayList<>();

for (Index index : concreteIndices) {
final SystemIndexDescriptor descriptor = systemIndices.findMatchingDescriptor(index.getName());
if (descriptor != null && descriptor.isAutomaticallyManaged()) {
final String descriptorMappings = descriptor.getMappings();

// Technically we could trip over a difference in whitespace here, but then again nobody should be trying to manually
// update a descriptor's mappings.
if (descriptorMappings.equals(requestMappings) == false) {
violations.add(index.getName());
}
}
}
return violations;
}
}
Loading

0 comments on commit 3c9cf2b

Please sign in to comment.