From af937b14ae665a8230a2a7f14377a0e86b9ff757 Mon Sep 17 00:00:00 2001 From: Albert Zaharovits Date: Thu, 25 Jul 2019 16:52:21 +0300 Subject: [PATCH] SecurityIndexManager handle RuntimeEx while reading mapping (#44409) Fixes exception handling while reading and parsing `.security-*` mappings and templates. --- .../support/SecurityIndexManager.java | 120 +++++++++--------- .../support/SecurityIndexManagerTests.java | 21 +++ 2 files changed, 82 insertions(+), 59 deletions(-) diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java index a397735096225..2c6f3902b3877 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/support/SecurityIndexManager.java @@ -117,7 +117,8 @@ private SecurityIndexManager(Client client, ClusterService clusterService, Strin clusterService.addListener(this); } - private SecurityIndexManager(Client client, String aliasName, String internalIndexName, int internalIndexFormat, + // protected for testing + protected SecurityIndexManager(Client client, String aliasName, String internalIndexName, int internalIndexFormat, Supplier mappingSourceSupplier, State indexState) { this.aliasName = aliasName; this.internalIndexName = internalIndexName; @@ -351,65 +352,68 @@ public void checkIndexVersionThenExecute(final Consumer consumer, fin */ public void prepareIndexIfNeededThenExecute(final Consumer consumer, final Runnable andThen) { final State indexState = this.indexState; // use a local copy so all checks execute against the same state! - // TODO we should improve this so we don't fire off a bunch of requests to do the same thing (create or update mappings) - if (indexState == State.UNRECOVERED_STATE) { - consumer.accept(new ElasticsearchStatusException( - "Cluster state has not been recovered yet, cannot write to the [" + indexState.concreteIndexName + "] index", - RestStatus.SERVICE_UNAVAILABLE)); - } else if (indexState.indexExists() && indexState.isIndexUpToDate == false) { - consumer.accept(new IllegalStateException( - "Index [" + indexState.concreteIndexName + "] is not on the current version." - + "Security features relying on the index will not be available until the upgrade API is run on the index")); - } else if (indexState.indexExists() == false) { - assert indexState.concreteIndexName != null; - logger.info("security index does not exist. Creating [{}] with alias [{}]", indexState.concreteIndexName, this.aliasName); - final byte[] mappingSource = mappingSourceSupplier.get(); - final Tuple mappingAndSettings = parseMappingAndSettingsFromTemplateBytes(mappingSource); - CreateIndexRequest request = new CreateIndexRequest(indexState.concreteIndexName) - .alias(new Alias(this.aliasName)) - .mapping(MapperService.SINGLE_MAPPING_NAME, mappingAndSettings.v1(), XContentType.JSON) - .waitForActiveShards(ActiveShardCount.ALL) - .settings(mappingAndSettings.v2()); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, - new ActionListener() { - @Override - public void onResponse(CreateIndexResponse createIndexResponse) { - if (createIndexResponse.isAcknowledged()) { - andThen.run(); - } else { - consumer.accept(new ElasticsearchException("Failed to create security index")); - } - } - - @Override - public void onFailure(Exception e) { - final Throwable cause = ExceptionsHelper.unwrapCause(e); - if (cause instanceof ResourceAlreadyExistsException) { - // the index already exists - it was probably just created so this - // node hasn't yet received the cluster state update with the index - andThen.run(); - } else { - consumer.accept(e); - } + try { + // TODO we should improve this so we don't fire off a bunch of requests to do the same thing (create or update mappings) + if (indexState == State.UNRECOVERED_STATE) { + throw new ElasticsearchStatusException( + "Cluster state has not been recovered yet, cannot write to the [" + indexState.concreteIndexName + "] index", + RestStatus.SERVICE_UNAVAILABLE); + } else if (indexState.indexExists() && indexState.isIndexUpToDate == false) { + throw new IllegalStateException("Index [" + indexState.concreteIndexName + "] is not on the current version." + + "Security features relying on the index will not be available until the upgrade API is run on the index"); + } else if (indexState.indexExists() == false) { + assert indexState.concreteIndexName != null; + logger.info("security index does not exist. Creating [{}] with alias [{}]", indexState.concreteIndexName, this.aliasName); + final byte[] mappingSource = mappingSourceSupplier.get(); + final Tuple mappingAndSettings = parseMappingAndSettingsFromTemplateBytes(mappingSource); + CreateIndexRequest request = new CreateIndexRequest(indexState.concreteIndexName) + .alias(new Alias(this.aliasName)) + .mapping(MapperService.SINGLE_MAPPING_NAME, mappingAndSettings.v1(), XContentType.JSON) + .waitForActiveShards(ActiveShardCount.ALL) + .settings(mappingAndSettings.v2()); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, + new ActionListener() { + @Override + public void onResponse(CreateIndexResponse createIndexResponse) { + if (createIndexResponse.isAcknowledged()) { + andThen.run(); + } else { + consumer.accept(new ElasticsearchException("Failed to create security index")); } - }, client.admin().indices()::create); - } else if (indexState.mappingUpToDate == false) { - logger.info("Index [{}] (alias [{}]) is not up to date. Updating mapping", indexState.concreteIndexName, this.aliasName); - final byte[] mappingSource = mappingSourceSupplier.get(); - final Tuple mappingAndSettings = parseMappingAndSettingsFromTemplateBytes(mappingSource); - PutMappingRequest request = new PutMappingRequest(indexState.concreteIndexName) - .source(mappingAndSettings.v1(), XContentType.JSON) - .type(MapperService.SINGLE_MAPPING_NAME); - executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, - ActionListener.wrap(putMappingResponse -> { - if (putMappingResponse.isAcknowledged()) { + } + + @Override + public void onFailure(Exception e) { + final Throwable cause = ExceptionsHelper.unwrapCause(e); + if (cause instanceof ResourceAlreadyExistsException) { + // the index already exists - it was probably just created so this + // node hasn't yet received the cluster state update with the index andThen.run(); } else { - consumer.accept(new IllegalStateException("put mapping request was not acknowledged")); + consumer.accept(e); } - }, consumer), client.admin().indices()::putMapping); - } else { - andThen.run(); + } + }, client.admin().indices()::create); + } else if (indexState.mappingUpToDate == false) { + logger.info("Index [{}] (alias [{}]) is not up to date. Updating mapping", indexState.concreteIndexName, this.aliasName); + final byte[] mappingSource = mappingSourceSupplier.get(); + final Tuple mappingAndSettings = parseMappingAndSettingsFromTemplateBytes(mappingSource); + PutMappingRequest request = new PutMappingRequest(indexState.concreteIndexName) + .source(mappingAndSettings.v1(), XContentType.JSON) + .type(MapperService.SINGLE_MAPPING_NAME); + executeAsyncWithOrigin(client.threadPool().getThreadContext(), SECURITY_ORIGIN, request, + ActionListener.wrap(putMappingResponse -> { + if (putMappingResponse.isAcknowledged()) { + andThen.run(); + } else { + consumer.accept(new IllegalStateException("put mapping request was not acknowledged")); + } + }, consumer), client.admin().indices()::putMapping); + } else { + andThen.run(); + } + } catch (Exception e) { + consumer.accept(e); } } @@ -433,7 +437,7 @@ private static byte[] readTemplateAsBytes(String templateName) { SecurityIndexManager.TEMPLATE_VERSION_PATTERN).getBytes(StandardCharsets.UTF_8); } - private static Tuple parseMappingAndSettingsFromTemplateBytes(byte[] template) { + private static Tuple parseMappingAndSettingsFromTemplateBytes(byte[] template) throws IOException { final PutIndexTemplateRequest request = new PutIndexTemplateRequest("name_is_not_important").source(template, XContentType.JSON); final String mappingSource = request.mappings().get(MapperService.SINGLE_MAPPING_NAME); try (XContentParser parser = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, @@ -446,8 +450,6 @@ private static Tuple parseMappingAndSettingsFromTemplateBytes( XContentBuilder builder = JsonXContent.contentBuilder(); builder.generator().copyCurrentStructure(parser); return new Tuple<>(Strings.toString(builder), request.settings()); - } catch (IOException e) { - throw ExceptionsHelper.convertToRuntime(e); } } diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/support/SecurityIndexManagerTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/support/SecurityIndexManagerTests.java index 5cf819c8f6034..6396757eb482a 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/support/SecurityIndexManagerTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/support/SecurityIndexManagerTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.security.support; import java.io.IOException; +import java.time.Instant; import java.util.Arrays; import java.util.HashMap; import java.util.LinkedHashMap; @@ -14,6 +15,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Supplier; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; @@ -67,6 +69,8 @@ import static org.hamcrest.Matchers.nullValue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; public class SecurityIndexManagerTests extends ESTestCase { @@ -97,6 +101,23 @@ void doExecute(ActionType action, Request request, ActionListener mappingSourceSupplier = () -> { + throw new RuntimeException(); + }; + Runnable runnable = mock(Runnable.class); + manager = new SecurityIndexManager(mock(Client.class), RestrictedIndicesNames.SECURITY_MAIN_ALIAS, + RestrictedIndicesNames.INTERNAL_SECURITY_MAIN_INDEX_7, SecurityIndexManager.INTERNAL_MAIN_INDEX_FORMAT, + mappingSourceSupplier, state); + AtomicReference exceptionConsumer = new AtomicReference<>(); + manager.prepareIndexIfNeededThenExecute(e -> exceptionConsumer.set(e), runnable); + verify(runnable, never()).run(); + assertThat(exceptionConsumer.get(), is(notNullValue())); } public void testIndexWithUpToDateMappingAndTemplate() throws IOException {