Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…for-elasticsearch#200)

* check schema version for IndexConfig mapping

acknowledge false in case indexmetadata does not exist
  • Loading branch information
jinsoor-amzn authored Apr 15, 2020
1 parent 03b0a2e commit 43cb503
Show file tree
Hide file tree
Showing 12 changed files with 384 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class IndexStateManagementHistory(
// We have to pass null for newIndexName in order to get Elastic to increment the index count.
val request = RolloverRequest(IndexStateManagementIndices.HISTORY_WRITE_INDEX_ALIAS, null)
request.createIndexRequest.index(IndexStateManagementIndices.HISTORY_INDEX_PATTERN)
.mapping(_DOC, indexStateManagementIndices.indexStateManagementHistoryMappings, XContentType.JSON)
.mapping(_DOC, IndexStateManagementIndices.indexStateManagementHistoryMappings, XContentType.JSON)
request.addMaxIndexDocsCondition(historyMaxDocs)
request.addMaxIndexAgeCondition(historyMaxAge)
val response = client.admin().indices().rolloversIndex(request).actionGet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.INDEX_STATE_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IndexUtils
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.OpenForTesting
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util._DOC
import org.apache.logging.log4j.LogManager
Expand All @@ -27,6 +28,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.client.IndicesAdminClient
import org.elasticsearch.cluster.service.ClusterService
Expand All @@ -40,15 +42,21 @@ class IndexStateManagementIndices(

private val logger = LogManager.getLogger(javaClass)

val indexStateManagementMappings = javaClass.classLoader.getResource("mappings/opendistro-ism-config.json").readText()

val indexStateManagementHistoryMappings = javaClass.classLoader.getResource("mappings/opendistro-ism-history.json").readText()

fun initIndexStateManagementIndex(actionListener: ActionListener<CreateIndexResponse>) {
fun checkAndUpdateISMConfigIndex(actionListener: ActionListener<AcknowledgedResponse>) {
if (!indexStateManagementIndexExists()) {
val indexRequest = CreateIndexRequest(INDEX_STATE_MANAGEMENT_INDEX)
.mapping(_DOC, indexStateManagementMappings, XContentType.JSON)
client.create(indexRequest, actionListener)
client.create(indexRequest, object : ActionListener<CreateIndexResponse> {
override fun onFailure(e: Exception) {
actionListener.onFailure(e)
}

override fun onResponse(response: CreateIndexResponse) {
actionListener.onResponse(response)
}
})
} else {
IndexUtils.checkAndUpdateConfigIndexMapping(clusterService.state(), client, actionListener)
}
}

Expand All @@ -62,7 +70,7 @@ class IndexStateManagementIndices(
if (indexStateManagementIndexExists()) return true

return try {
val response: CreateIndexResponse = client.suspendUntil { initIndexStateManagementIndex(it) }
val response: AcknowledgedResponse = client.suspendUntil { checkAndUpdateISMConfigIndex(it) }
if (response.isAcknowledged) {
return true
}
Expand Down Expand Up @@ -118,5 +126,8 @@ class IndexStateManagementIndices(
const val HISTORY_WRITE_INDEX_ALIAS = "$HISTORY_INDEX_BASE-write"
const val HISTORY_INDEX_PATTERN = "<$HISTORY_INDEX_BASE-{now/d{yyyy.MM.dd}}-1>"
const val HISTORY_ALL = "$HISTORY_INDEX_BASE*"

val indexStateManagementMappings = javaClass.classLoader.getResource("mappings/opendistro-ism-config.json").readText()
val indexStateManagementHistoryMappings = javaClass.classLoader.getResource("mappings/opendistro-ism-history.json").readText()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.model

import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.instant
import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.optionalTimeField
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IndexUtils
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.WITH_TYPE
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.ToXContentObject
Expand Down Expand Up @@ -97,7 +98,7 @@ data class Policy(
var defaultState: String? = null
var errorNotification: ErrorNotification? = null
var lastUpdatedTime: Instant? = null
var schemaVersion: Long = 1
var schemaVersion: Long = IndexUtils.DEFAULT_SCHEMA_VERSION
val states: MutableList<State> = mutableListOf()

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.coordina
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FAILED_INDICES
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FAILURES
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FailedIndex
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IndexUtils
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.UPDATED_INDICES
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.isSafeToChange
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.updateManagedIndexRequest
Expand All @@ -40,6 +41,7 @@ import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.search.SearchRequest
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.action.support.IndicesOptions
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.Strings
Expand Down Expand Up @@ -107,6 +109,7 @@ class RestChangePolicyAction(
private val managedIndexUuids = mutableListOf<Pair<String, String>>()
private val indexUuidToCurrentState = mutableMapOf<String, String>()
lateinit var policy: Policy
lateinit var response: GetResponse

fun start() {
val getRequest = GetRequest(INDEX_STATE_MANAGEMENT_INDEX, changePolicy.policyID)
Expand All @@ -118,7 +121,19 @@ class RestChangePolicyAction(
if (!response.isExists || response.isSourceEmpty) {
return channel.sendResponse(BytesRestResponse(RestStatus.NOT_FOUND, "Could not find policy=${changePolicy.policyID}"))
}
this.response = response
IndexUtils.checkAndUpdateConfigIndexMapping(
clusterService.state(),
client.admin().indices(),
ActionListener.wrap(::onUpdateMapping, ::onFailure)
)
}

private fun onUpdateMapping(acknowledgedResponse: AcknowledgedResponse) {
if (!acknowledgedResponse.isAcknowledged) {
return channel.sendResponse(BytesRestResponse(RestStatus.FAILED_DEPENDENCY,
"Could not update $INDEX_STATE_MANAGEMENT_INDEX with new mapping."))
}
policy = XContentHelper.createParser(
channel.request().xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy.C
import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings.Companion.ALLOW_LIST
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IF_PRIMARY_TERM
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IF_SEQ_NO
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IndexUtils
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.REFRESH
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util._ID
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util._PRIMARY_TERM
Expand All @@ -32,10 +33,10 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getDisall
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.action.index.IndexResponse
import org.elasticsearch.action.support.WriteRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.settings.Settings
Expand Down Expand Up @@ -118,19 +119,15 @@ class RestIndexPolicyAction(
) : AsyncActionHandler(client, channel) {

fun start() {
if (!ismIndices.indexStateManagementIndexExists()) {
ismIndices.initIndexStateManagementIndex(ActionListener.wrap(::onCreateMappingsResponse, ::onFailure))
} else {
putPolicy()
}
ismIndices.checkAndUpdateISMConfigIndex(ActionListener.wrap(::onCreateMappingsResponse, ::onFailure))
}

private fun onCreateMappingsResponse(response: CreateIndexResponse) {
private fun onCreateMappingsResponse(response: AcknowledgedResponse) {
if (response.isAcknowledged) {
log.info("Created $INDEX_STATE_MANAGEMENT_INDEX with mappings.")
log.info("Successfully created or updated $INDEX_STATE_MANAGEMENT_INDEX with newest mappings.")
putPolicy()
} else {
log.error("Create $INDEX_STATE_MANAGEMENT_INDEX mappings call not acknowledged.")
log.error("Unable to create or update $INDEX_STATE_MANAGEMENT_INDEX with newest mapping.")
channel.sendResponse(
BytesRestResponse(
RestStatus.INTERNAL_SERVER_ERROR,
Expand All @@ -140,6 +137,8 @@ class RestIndexPolicyAction(
}

private fun putPolicy() {
newPolicy.copy(schemaVersion = IndexUtils.indexManagementSchemaVersion)

val indexRequest = IndexRequest(INDEX_STATE_MANAGEMENT_INDEX)
.setRefreshPolicy(refreshPolicy)
.source(newPolicy.toXContent(channel.newBuilder()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,12 @@ class RestRetryFailedManagedIndexAction(
Pair(Index(managedIndexMetaData.index, managedIndexMetaData.indexUuid), managedIndexMetaData.copy(
stepMetaData = null,
policyRetryInfo = PolicyRetryInfoMetaData(false, 0),
actionMetaData = managedIndexMetaData.actionMetaData?.copy(failed = false, consumedRetries = 0, lastRetryTime = null, startTime = null),
actionMetaData = managedIndexMetaData.actionMetaData?.copy(
failed = false,
consumedRetries = 0,
lastRetryTime = null,
startTime = null
),
transitionTo = startState,
info = mapOf("message" to "Attempting to retry")
))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.util

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementIndices
import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.IndicesAdminClient
import org.elasticsearch.cluster.ClusterState
import org.elasticsearch.cluster.metadata.IndexMetaData
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentType

class IndexUtils {
companion object {
@Suppress("ObjectPropertyNaming")
const val _META = "_meta"
const val SCHEMA_VERSION = "schema_version"
const val DEFAULT_SCHEMA_VERSION = 1L
val logger = LogManager.getLogger(IndexUtils::class.java)

var indexManagementSchemaVersion: Long
private set
var indexManagementHistoryVersion: Long
private set

init {
indexManagementSchemaVersion = getSchemaVersion(IndexStateManagementIndices.indexStateManagementMappings)
indexManagementHistoryVersion = getSchemaVersion(IndexStateManagementIndices.indexStateManagementHistoryMappings)
}

fun getSchemaVersion(mapping: String): Long {
val xcp = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, mapping)

while (!xcp.isClosed) {
val token = xcp.currentToken()
if (token != null && token != XContentParser.Token.END_OBJECT && token != XContentParser.Token.START_OBJECT) {
if (xcp.currentName() != _META) {
xcp.nextToken()
xcp.skipChildren()
} else {
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
when (xcp.currentName()) {
SCHEMA_VERSION -> {
val version = xcp.longValue()
require(version > -1)
return version
}
else -> xcp.nextToken()
}
}
}
}
xcp.nextToken()
}
return DEFAULT_SCHEMA_VERSION
}

fun shouldUpdateIndex(index: IndexMetaData, newVersion: Long): Boolean {
var oldVersion = DEFAULT_SCHEMA_VERSION

val indexMapping = index.mapping()?.sourceAsMap()
if (indexMapping != null && indexMapping.containsKey(_META) && indexMapping[_META] is HashMap<*, *>) {
val metaData = indexMapping[_META] as HashMap<*, *>
if (metaData.containsKey(SCHEMA_VERSION)) {
oldVersion = (metaData[SCHEMA_VERSION] as Int).toLong()
}
}
return newVersion > oldVersion
}

fun checkAndUpdateConfigIndexMapping(
clusterState: ClusterState,
client: IndicesAdminClient,
actionListener: ActionListener<AcknowledgedResponse>
) {
checkAndUpdateIndexMapping(
IndexStateManagementPlugin.INDEX_STATE_MANAGEMENT_INDEX,
indexManagementSchemaVersion,
IndexStateManagementIndices.indexStateManagementMappings,
clusterState,
client,
actionListener
)
}

@OpenForTesting
fun checkAndUpdateIndexMapping(
index: String,
schemaVersion: Long,
mapping: String,
clusterState: ClusterState,
client: IndicesAdminClient,
actionListener: ActionListener<AcknowledgedResponse>
) {
if (clusterState.metaData.indices.containsKey(index)) {
if (shouldUpdateIndex(clusterState.metaData.indices[index], schemaVersion)) {
val putMappingRequest: PutMappingRequest = PutMappingRequest(index).type(_DOC).source(mapping, XContentType.JSON)
client.putMapping(putMappingRequest, actionListener)
} else {
actionListener.onResponse(AcknowledgedResponse(true))
}
} else {
logger.error("IndexMetaData does not exist for $index")
actionListener.onResponse(AcknowledgedResponse(false))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
* permissions and limitations under the License.
*/

@file:Suppress("TopLevelPropertyNaming")

@file:Suppress("TopLevelPropertyNaming", "MatchingDeclarationName")
package com.amazon.opendistroforelasticsearch.indexstatemanagement.util

import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.optionalTimeField
Expand All @@ -38,9 +37,6 @@ const val REFRESH = "refresh"
const val WITH_TYPE = "with_type"
val XCONTENT_WITHOUT_TYPE = ToXContent.MapParams(mapOf(WITH_TYPE to "false"))

const val SCHEMA_VERSION = "schema_version"
const val NO_SCHEMA_VERSION = 0

const val FAILURES = "failures"
const val FAILED_INDICES = "failed_indices"
const val UPDATED_INDICES = "updated_indices"
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{
"_meta" : {
"schema_version": 2
},
"dynamic": "strict",
"properties": {
"policy": {
Expand Down
Loading

0 comments on commit 43cb503

Please sign in to comment.