diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementHistory.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementHistory.kt index 78e137544..9e1be6aaa 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementHistory.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementHistory.kt @@ -118,7 +118,7 @@ class IndexStateManagementHistory( // We have to pass null for newIndexName in order to get Elastic to increment the index count. val request = RolloverRequest(IndexStateManagementIndices.HISTORY_WRITE_INDEX_ALIAS, null) request.createIndexRequest.index(IndexStateManagementIndices.HISTORY_INDEX_PATTERN) - .mapping(_DOC, indexStateManagementIndices.indexStateManagementHistoryMappings, XContentType.JSON) + .mapping(_DOC, IndexStateManagementIndices.indexStateManagementHistoryMappings, XContentType.JSON) request.addMaxIndexDocsCondition(historyMaxDocs) request.addMaxIndexAgeCondition(historyMaxAge) val response = client.admin().indices().rolloversIndex(request).actionGet() diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementIndices.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementIndices.kt index b83155939..cbf7a6209 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementIndices.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementIndices.kt @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.INDEX_STATE_MANAGEMENT_INDEX import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil +import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IndexUtils import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.OpenForTesting import com.amazon.opendistroforelasticsearch.indexstatemanagement.util._DOC import org.apache.logging.log4j.LogManager @@ -27,6 +28,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest import org.elasticsearch.action.admin.indices.create.CreateIndexResponse import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse +import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client import org.elasticsearch.client.IndicesAdminClient import org.elasticsearch.cluster.service.ClusterService @@ -40,15 +42,21 @@ class IndexStateManagementIndices( private val logger = LogManager.getLogger(javaClass) - val indexStateManagementMappings = javaClass.classLoader.getResource("mappings/opendistro-ism-config.json").readText() - - val indexStateManagementHistoryMappings = javaClass.classLoader.getResource("mappings/opendistro-ism-history.json").readText() - - fun initIndexStateManagementIndex(actionListener: ActionListener) { + fun checkAndUpdateISMConfigIndex(actionListener: ActionListener) { if (!indexStateManagementIndexExists()) { val indexRequest = CreateIndexRequest(INDEX_STATE_MANAGEMENT_INDEX) .mapping(_DOC, indexStateManagementMappings, XContentType.JSON) - client.create(indexRequest, actionListener) + client.create(indexRequest, object : ActionListener { + override fun onFailure(e: Exception) { + actionListener.onFailure(e) + } + + override fun onResponse(response: CreateIndexResponse) { + actionListener.onResponse(response) + } + }) + } else { + IndexUtils.checkAndUpdateConfigIndexMapping(clusterService.state(), client, actionListener) } } @@ -62,7 +70,7 @@ class IndexStateManagementIndices( if (indexStateManagementIndexExists()) return true return try { - val response: CreateIndexResponse = client.suspendUntil { initIndexStateManagementIndex(it) } + val response: AcknowledgedResponse = client.suspendUntil { checkAndUpdateISMConfigIndex(it) } if (response.isAcknowledged) { return true } @@ -118,5 +126,8 @@ class IndexStateManagementIndices( const val HISTORY_WRITE_INDEX_ALIAS = "$HISTORY_INDEX_BASE-write" const val HISTORY_INDEX_PATTERN = "<$HISTORY_INDEX_BASE-{now/d{yyyy.MM.dd}}-1>" const val HISTORY_ALL = "$HISTORY_INDEX_BASE*" + + val indexStateManagementMappings = javaClass.classLoader.getResource("mappings/opendistro-ism-config.json").readText() + val indexStateManagementHistoryMappings = javaClass.classLoader.getResource("mappings/opendistro-ism-history.json").readText() } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/Policy.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/Policy.kt index 27191fd4d..5ab3bc8cb 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/Policy.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/Policy.kt @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.model import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.instant import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.optionalTimeField +import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IndexUtils import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.WITH_TYPE import org.elasticsearch.common.xcontent.ToXContent import org.elasticsearch.common.xcontent.ToXContentObject @@ -97,7 +98,7 @@ data class Policy( var defaultState: String? = null var errorNotification: ErrorNotification? = null var lastUpdatedTime: Instant? = null - var schemaVersion: Long = 1 + var schemaVersion: Long = IndexUtils.DEFAULT_SCHEMA_VERSION val states: MutableList = mutableListOf() ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation) diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestChangePolicyAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestChangePolicyAction.kt index ebb055ebb..d3d393c4c 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestChangePolicyAction.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestChangePolicyAction.kt @@ -26,6 +26,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.coordina import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FAILED_INDICES import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FAILURES import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FailedIndex +import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IndexUtils import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.UPDATED_INDICES import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.isSafeToChange import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.updateManagedIndexRequest @@ -40,6 +41,7 @@ import org.elasticsearch.action.get.GetResponse import org.elasticsearch.action.search.SearchRequest import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.action.support.IndicesOptions +import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.node.NodeClient import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.Strings @@ -107,6 +109,7 @@ class RestChangePolicyAction( private val managedIndexUuids = mutableListOf>() private val indexUuidToCurrentState = mutableMapOf() lateinit var policy: Policy + lateinit var response: GetResponse fun start() { val getRequest = GetRequest(INDEX_STATE_MANAGEMENT_INDEX, changePolicy.policyID) @@ -118,7 +121,19 @@ class RestChangePolicyAction( if (!response.isExists || response.isSourceEmpty) { return channel.sendResponse(BytesRestResponse(RestStatus.NOT_FOUND, "Could not find policy=${changePolicy.policyID}")) } + this.response = response + IndexUtils.checkAndUpdateConfigIndexMapping( + clusterService.state(), + client.admin().indices(), + ActionListener.wrap(::onUpdateMapping, ::onFailure) + ) + } + private fun onUpdateMapping(acknowledgedResponse: AcknowledgedResponse) { + if (!acknowledgedResponse.isAcknowledged) { + return channel.sendResponse(BytesRestResponse(RestStatus.FAILED_DEPENDENCY, + "Could not update $INDEX_STATE_MANAGEMENT_INDEX with new mapping.")) + } policy = XContentHelper.createParser( channel.request().xContentRegistry, LoggingDeprecationHandler.INSTANCE, diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestIndexPolicyAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestIndexPolicyAction.kt index ebeadb4d5..e63bf4521 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestIndexPolicyAction.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestIndexPolicyAction.kt @@ -23,6 +23,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy.C import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings.Companion.ALLOW_LIST import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IF_PRIMARY_TERM import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IF_SEQ_NO +import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IndexUtils import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.REFRESH import com.amazon.opendistroforelasticsearch.indexstatemanagement.util._ID import com.amazon.opendistroforelasticsearch.indexstatemanagement.util._PRIMARY_TERM @@ -32,10 +33,10 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getDisall import org.apache.logging.log4j.LogManager import org.elasticsearch.action.ActionListener import org.elasticsearch.action.DocWriteRequest -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.action.index.IndexResponse import org.elasticsearch.action.support.WriteRequest +import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.node.NodeClient import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.settings.Settings @@ -118,19 +119,15 @@ class RestIndexPolicyAction( ) : AsyncActionHandler(client, channel) { fun start() { - if (!ismIndices.indexStateManagementIndexExists()) { - ismIndices.initIndexStateManagementIndex(ActionListener.wrap(::onCreateMappingsResponse, ::onFailure)) - } else { - putPolicy() - } + ismIndices.checkAndUpdateISMConfigIndex(ActionListener.wrap(::onCreateMappingsResponse, ::onFailure)) } - private fun onCreateMappingsResponse(response: CreateIndexResponse) { + private fun onCreateMappingsResponse(response: AcknowledgedResponse) { if (response.isAcknowledged) { - log.info("Created $INDEX_STATE_MANAGEMENT_INDEX with mappings.") + log.info("Successfully created or updated $INDEX_STATE_MANAGEMENT_INDEX with newest mappings.") putPolicy() } else { - log.error("Create $INDEX_STATE_MANAGEMENT_INDEX mappings call not acknowledged.") + log.error("Unable to create or update $INDEX_STATE_MANAGEMENT_INDEX with newest mapping.") channel.sendResponse( BytesRestResponse( RestStatus.INTERNAL_SERVER_ERROR, @@ -140,6 +137,8 @@ class RestIndexPolicyAction( } private fun putPolicy() { + newPolicy.copy(schemaVersion = IndexUtils.indexManagementSchemaVersion) + val indexRequest = IndexRequest(INDEX_STATE_MANAGEMENT_INDEX) .setRefreshPolicy(refreshPolicy) .source(newPolicy.toXContent(channel.newBuilder())) diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestRetryFailedManagedIndexAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestRetryFailedManagedIndexAction.kt index 35258695d..3e8d02075 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestRetryFailedManagedIndexAction.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestRetryFailedManagedIndexAction.kt @@ -154,7 +154,12 @@ class RestRetryFailedManagedIndexAction( Pair(Index(managedIndexMetaData.index, managedIndexMetaData.indexUuid), managedIndexMetaData.copy( stepMetaData = null, policyRetryInfo = PolicyRetryInfoMetaData(false, 0), - actionMetaData = managedIndexMetaData.actionMetaData?.copy(failed = false, consumedRetries = 0, lastRetryTime = null, startTime = null), + actionMetaData = managedIndexMetaData.actionMetaData?.copy( + failed = false, + consumedRetries = 0, + lastRetryTime = null, + startTime = null + ), transitionTo = startState, info = mapOf("message" to "Attempting to retry") )) diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/IndexUtils.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/IndexUtils.kt new file mode 100644 index 000000000..35b8c66cb --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/IndexUtils.kt @@ -0,0 +1,128 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.util + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementIndices +import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin +import org.apache.logging.log4j.LogManager +import org.elasticsearch.action.ActionListener +import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest +import org.elasticsearch.action.support.master.AcknowledgedResponse +import org.elasticsearch.client.IndicesAdminClient +import org.elasticsearch.cluster.ClusterState +import org.elasticsearch.cluster.metadata.IndexMetaData +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler +import org.elasticsearch.common.xcontent.NamedXContentRegistry +import org.elasticsearch.common.xcontent.XContentParser +import org.elasticsearch.common.xcontent.XContentType + +class IndexUtils { + companion object { + @Suppress("ObjectPropertyNaming") + const val _META = "_meta" + const val SCHEMA_VERSION = "schema_version" + const val DEFAULT_SCHEMA_VERSION = 1L + val logger = LogManager.getLogger(IndexUtils::class.java) + + var indexManagementSchemaVersion: Long + private set + var indexManagementHistoryVersion: Long + private set + + init { + indexManagementSchemaVersion = getSchemaVersion(IndexStateManagementIndices.indexStateManagementMappings) + indexManagementHistoryVersion = getSchemaVersion(IndexStateManagementIndices.indexStateManagementHistoryMappings) + } + + fun getSchemaVersion(mapping: String): Long { + val xcp = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, mapping) + + while (!xcp.isClosed) { + val token = xcp.currentToken() + if (token != null && token != XContentParser.Token.END_OBJECT && token != XContentParser.Token.START_OBJECT) { + if (xcp.currentName() != _META) { + xcp.nextToken() + xcp.skipChildren() + } else { + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + when (xcp.currentName()) { + SCHEMA_VERSION -> { + val version = xcp.longValue() + require(version > -1) + return version + } + else -> xcp.nextToken() + } + } + } + } + xcp.nextToken() + } + return DEFAULT_SCHEMA_VERSION + } + + fun shouldUpdateIndex(index: IndexMetaData, newVersion: Long): Boolean { + var oldVersion = DEFAULT_SCHEMA_VERSION + + val indexMapping = index.mapping()?.sourceAsMap() + if (indexMapping != null && indexMapping.containsKey(_META) && indexMapping[_META] is HashMap<*, *>) { + val metaData = indexMapping[_META] as HashMap<*, *> + if (metaData.containsKey(SCHEMA_VERSION)) { + oldVersion = (metaData[SCHEMA_VERSION] as Int).toLong() + } + } + return newVersion > oldVersion + } + + fun checkAndUpdateConfigIndexMapping( + clusterState: ClusterState, + client: IndicesAdminClient, + actionListener: ActionListener + ) { + checkAndUpdateIndexMapping( + IndexStateManagementPlugin.INDEX_STATE_MANAGEMENT_INDEX, + indexManagementSchemaVersion, + IndexStateManagementIndices.indexStateManagementMappings, + clusterState, + client, + actionListener + ) + } + + @OpenForTesting + fun checkAndUpdateIndexMapping( + index: String, + schemaVersion: Long, + mapping: String, + clusterState: ClusterState, + client: IndicesAdminClient, + actionListener: ActionListener + ) { + if (clusterState.metaData.indices.containsKey(index)) { + if (shouldUpdateIndex(clusterState.metaData.indices[index], schemaVersion)) { + val putMappingRequest: PutMappingRequest = PutMappingRequest(index).type(_DOC).source(mapping, XContentType.JSON) + client.putMapping(putMappingRequest, actionListener) + } else { + actionListener.onResponse(AcknowledgedResponse(true)) + } + } else { + logger.error("IndexMetaData does not exist for $index") + actionListener.onResponse(AcknowledgedResponse(false)) + } + } + } +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/RestHandlerUtils.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/RestHandlerUtils.kt index 43ed25036..5bf4d3532 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/RestHandlerUtils.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/RestHandlerUtils.kt @@ -13,8 +13,7 @@ * permissions and limitations under the License. */ -@file:Suppress("TopLevelPropertyNaming") - +@file:Suppress("TopLevelPropertyNaming", "MatchingDeclarationName") package com.amazon.opendistroforelasticsearch.indexstatemanagement.util import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.optionalTimeField @@ -38,9 +37,6 @@ const val REFRESH = "refresh" const val WITH_TYPE = "with_type" val XCONTENT_WITHOUT_TYPE = ToXContent.MapParams(mapOf(WITH_TYPE to "false")) -const val SCHEMA_VERSION = "schema_version" -const val NO_SCHEMA_VERSION = 0 - const val FAILURES = "failures" const val FAILED_INDICES = "failed_indices" const val UPDATED_INDICES = "updated_indices" diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index e431718f7..42b2212bc 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -1,4 +1,7 @@ { + "_meta" : { + "schema_version": 2 + }, "dynamic": "strict", "properties": { "policy": { diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementIndicesIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementIndicesIT.kt new file mode 100644 index 000000000..ac19f5834 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementIndicesIT.kt @@ -0,0 +1,81 @@ +package com.amazon.opendistroforelasticsearch.indexstatemanagement + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementIndices.Companion.indexStateManagementMappings +import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.INDEX_STATE_MANAGEMENT_INDEX +import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.POLICY_BASE_URI +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ChangePolicy +import com.amazon.opendistroforelasticsearch.indexstatemanagement.resthandler.RestChangePolicyAction +import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FAILED_INDICES +import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FAILURES +import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.UPDATED_INDICES +import org.apache.http.entity.ContentType +import org.apache.http.entity.StringEntity +import org.elasticsearch.common.settings.Settings +import org.elasticsearch.rest.RestRequest +import org.elasticsearch.test.ESTestCase +import java.util.Locale + +class IndexStateManagementIndicesIT : IndexStateManagementRestTestCase() { + + private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT) + + fun `test create index management`() { + val policy = randomPolicy() + val policyId = ESTestCase.randomAlphaOfLength(10) + client().makeRequest("PUT", "$POLICY_BASE_URI/$policyId", emptyMap(), policy.toHttpEntity()) + assertIndexExists(INDEX_STATE_MANAGEMENT_INDEX) + verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 2) + } + + fun `test update management index mapping with new schema version`() { + assertIndexDoesNotExist(INDEX_STATE_MANAGEMENT_INDEX) + + val mapping = indexStateManagementMappings.trimStart('{').trimEnd('}') + .replace("\"schema_version\": 2", "\"schema_version\": 0") + + createIndex(INDEX_STATE_MANAGEMENT_INDEX, Settings.EMPTY, mapping) + assertIndexExists(INDEX_STATE_MANAGEMENT_INDEX) + verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 0) + client().makeRequest("DELETE", "*") + + val policy = randomPolicy() + val policyId = ESTestCase.randomAlphaOfLength(10) + client().makeRequest("PUT", "$POLICY_BASE_URI/$policyId", emptyMap(), policy.toHttpEntity()) + + assertIndexExists(INDEX_STATE_MANAGEMENT_INDEX) + verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 2) + } + + fun `test changing policy on an index that hasn't initialized yet check schema version`() { + val policy = createRandomPolicy() + val newPolicy = createPolicy(randomPolicy(), "new_policy", true) + val indexName = "${testIndexName}_computer" + val (index) = createIndex(indexName, policy.id) + + val managedIndexConfig = getExistingManagedIndexConfig(index) + assertNull("Change policy is not null", managedIndexConfig.changePolicy) + assertNull("Policy has already initialized", managedIndexConfig.policy) + assertEquals("Policy id does not match", policy.id, managedIndexConfig.policyID) + + val mapping = "{" + indexStateManagementMappings.trimStart('{').trimEnd('}') + .replace("\"schema_version\": 2", "\"schema_version\": 0") + + val entity = StringEntity(mapping, ContentType.APPLICATION_JSON) + client().makeRequest(RestRequest.Method.PUT.toString(), + "/$INDEX_STATE_MANAGEMENT_INDEX/_mapping", emptyMap(), entity) + + verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 0) + + // if we try to change policy now, it'll have no ManagedIndexMetaData yet and should succeed + val changePolicy = ChangePolicy(newPolicy.id, null, emptyList(), false) + val response = client().makeRequest( + RestRequest.Method.POST.toString(), + "${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$index", emptyMap(), changePolicy.toHttpEntity()) + + verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 2) + + assertAffectedIndicesResponseIsEqual(mapOf(FAILURES to false, FAILED_INDICES to emptyList(), UPDATED_INDICES to 1), response.asMap()) + + waitFor { assertEquals(newPolicy.id, getManagedIndexConfig(index)?.changePolicy?.policyID) } + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt index 546eec84c..2cc45ff76 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -44,6 +44,7 @@ import org.apache.http.message.BasicHeader import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.client.Request import org.elasticsearch.client.Response +import org.elasticsearch.client.RestClient import org.elasticsearch.cluster.metadata.IndexMetaData import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.unit.TimeValue @@ -508,4 +509,38 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() { } return true } + + protected fun assertIndexExists(index: String) { + val response = client().makeRequest("HEAD", index) + assertEquals("Index $index does not exist.", RestStatus.OK, response.restStatus()) + } + + protected fun assertIndexDoesNotExist(index: String) { + val response = client().makeRequest("HEAD", index) + assertEquals("Index $index does exist.", RestStatus.NOT_FOUND, response.restStatus()) + } + + protected fun verifyIndexSchemaVersion(index: String, expectedVersion: Int) { + val indexMapping = client().getIndexMapping(index) + val indexName = indexMapping.keys.toList()[0] + val mappings = indexMapping.stringMap(indexName)?.stringMap("mappings") + var version = 0 + if (mappings!!.containsKey("_meta")) { + val meta = mappings.stringMap("_meta") + if (meta!!.containsKey("schema_version")) version = meta.get("schema_version") as Int + } + assertEquals(expectedVersion, version) + } + + @Suppress("UNCHECKED_CAST") + fun Map.stringMap(key: String): Map? { + val map = this as Map> + return map[key] + } + + fun RestClient.getIndexMapping(index: String): Map { + val response = this.makeRequest("GET", "$index/_mapping") + assertEquals(RestStatus.OK, response.restStatus()) + return response.asMap() + } } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/IndexUtilsTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/IndexUtilsTests.kt new file mode 100644 index 000000000..c77c2ec5e --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/util/IndexUtilsTests.kt @@ -0,0 +1,86 @@ +package com.amazon.opendistroforelasticsearch.indexstatemanagement.util + +import org.elasticsearch.cluster.metadata.IndexMetaData +import org.elasticsearch.common.xcontent.XContentType +import org.elasticsearch.test.ESTestCase +import kotlin.test.assertFailsWith + +class IndexUtilsTests : ESTestCase() { + + fun `test get schema version`() { + val message = "{\"user\":{ \"name\":\"test\"},\"_meta\":{\"schema_version\": 1}}" + + val schemaVersion = IndexUtils.getSchemaVersion(message) + assertEquals(1, schemaVersion) + } + + fun `test get schema version without _meta`() { + val message = "{\"user\":{ \"name\":\"test\"}}" + + val schemaVersion = IndexUtils.getSchemaVersion(message) + assertEquals(1, schemaVersion) + } + + fun `test get schema version without schema_version`() { + val message = "{\"user\":{ \"name\":\"test\"},\"_meta\":{\"test\": 1}}" + + val schemaVersion = IndexUtils.getSchemaVersion(message) + assertEquals(1, schemaVersion) + } + + fun `test get schema version with negative schema_version`() { + val message = "{\"user\":{ \"name\":\"test\"},\"_meta\":{\"schema_version\": -1}}" + + assertFailsWith(IllegalArgumentException::class, "Expected IllegalArgumentException") { + IndexUtils.getSchemaVersion(message) + } + } + + fun `test get schema version with wrong schema_version`() { + val message = "{\"user\":{ \"name\":\"test\"},\"_meta\":{\"schema_version\": \"wrong\"}}" + + assertFailsWith(IllegalArgumentException::class, "Expected IllegalArgumentException") { + IndexUtils.getSchemaVersion(message) + } + } + + fun `test should update index without original version`() { + val indexContent = "{\"testIndex\":{\"settings\":{\"index\":{\"creation_date\":\"1558407515699\"," + + "\"number_of_shards\":\"1\",\"number_of_replicas\":\"1\",\"uuid\":\"t-VBBW6aR6KpJ3XP5iISOA\"," + + "\"version\":{\"created\":\"6040399\"},\"provided_name\":\"data_test\"}},\"mapping_version\":123," + + "\"settings_version\":123,\"mappings\":{\"_doc\":{\"properties\":{\"name\":{\"type\":\"keyword\"}}}}}}" + + val parser = createParser(XContentType.JSON.xContent(), indexContent) + val index: IndexMetaData = IndexMetaData.fromXContent(parser) + + val shouldUpdateIndex = IndexUtils.shouldUpdateIndex(index, 10) + assertTrue(shouldUpdateIndex) + } + + fun `test should update index with lagged version`() { + val indexContent = "{\"testIndex\":{\"settings\":{\"index\":{\"creation_date\":\"1558407515699\"," + + "\"number_of_shards\":\"1\",\"number_of_replicas\":\"1\",\"uuid\":\"t-VBBW6aR6KpJ3XP5iISOA\"," + + "\"version\":{\"created\":\"6040399\"},\"provided_name\":\"data_test\"}},\"mapping_version\":123," + + "\"settings_version\":123,\"mappings\":{\"_doc\":{\"_meta\":{\"schema_version\":1},\"properties\":" + + "{\"name\":{\"type\":\"keyword\"}}}}}}" + + val parser = createParser(XContentType.JSON.xContent(), indexContent) + val index: IndexMetaData = IndexMetaData.fromXContent(parser) + + val shouldUpdateIndex = IndexUtils.shouldUpdateIndex(index, 10) + assertTrue(shouldUpdateIndex) + } + + fun `test should update index with same version`() { + val indexContent = "{\"testIndex\":{\"settings\":{\"index\":{\"creation_date\":\"1558407515699\"," + + "\"number_of_shards\":\"1\",\"number_of_replicas\":\"1\",\"uuid\":\"t-VBBW6aR6KpJ3XP5iISOA\"," + + "\"version\":{\"created\":\"6040399\"},\"provided_name\":\"data_test\"}},\"mappings\":" + + "{\"_doc\":{\"_meta\":{\"schema_version\":1},\"properties\":{\"name\":{\"type\":\"keyword\"}}}}}}" + + val parser = createParser(XContentType.JSON.xContent(), indexContent) + val index: IndexMetaData = IndexMetaData.fromXContent(parser) + + val shouldUpdateIndex = IndexUtils.shouldUpdateIndex(index, 1) + assertFalse(shouldUpdateIndex) + } +} \ No newline at end of file