Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
…for-elasticsearch#202)

* check schema version for IndexConfig mapping

acknowledge false in case indexmetadata does not exist
jinsoor-amzn authored Apr 15, 2020
1 parent 03ab689 commit 5f22b5b
Showing 12 changed files with 384 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -118,7 +118,7 @@ class IndexStateManagementHistory(
// We have to pass null for newIndexName in order to get Elastic to increment the index count.
val request = RolloverRequest(IndexStateManagementIndices.HISTORY_WRITE_INDEX_ALIAS, null)
request.createIndexRequest.index(IndexStateManagementIndices.HISTORY_INDEX_PATTERN)
.mapping(_DOC, indexStateManagementIndices.indexStateManagementHistoryMappings, XContentType.JSON)
.mapping(_DOC, IndexStateManagementIndices.indexStateManagementHistoryMappings, XContentType.JSON)
request.addMaxIndexDocsCondition(historyMaxDocs)
request.addMaxIndexAgeCondition(historyMaxAge)
val response = client.admin().indices().rolloversIndex(request).actionGet()
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.INDEX_STATE_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IndexUtils
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.OpenForTesting
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util._DOC
import org.apache.logging.log4j.LogManager
@@ -27,6 +28,7 @@ import org.elasticsearch.action.admin.indices.create.CreateIndexRequest
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.client.IndicesAdminClient
import org.elasticsearch.cluster.service.ClusterService
@@ -40,15 +42,21 @@ class IndexStateManagementIndices(

private val logger = LogManager.getLogger(javaClass)

val indexStateManagementMappings = javaClass.classLoader.getResource("mappings/opendistro-ism-config.json").readText()

val indexStateManagementHistoryMappings = javaClass.classLoader.getResource("mappings/opendistro-ism-history.json").readText()

fun initIndexStateManagementIndex(actionListener: ActionListener<CreateIndexResponse>) {
fun checkAndUpdateISMConfigIndex(actionListener: ActionListener<AcknowledgedResponse>) {
if (!indexStateManagementIndexExists()) {
val indexRequest = CreateIndexRequest(INDEX_STATE_MANAGEMENT_INDEX)
.mapping(_DOC, indexStateManagementMappings, XContentType.JSON)
client.create(indexRequest, actionListener)
client.create(indexRequest, object : ActionListener<CreateIndexResponse> {
override fun onFailure(e: Exception) {
actionListener.onFailure(e)
}

override fun onResponse(response: CreateIndexResponse) {
actionListener.onResponse(response)
}
})
} else {
IndexUtils.checkAndUpdateConfigIndexMapping(clusterService.state(), client, actionListener)
}
}

@@ -62,7 +70,7 @@ class IndexStateManagementIndices(
if (indexStateManagementIndexExists()) return true

return try {
val response: CreateIndexResponse = client.suspendUntil { initIndexStateManagementIndex(it) }
val response: AcknowledgedResponse = client.suspendUntil { checkAndUpdateISMConfigIndex(it) }
if (response.isAcknowledged) {
return true
}
@@ -118,5 +126,8 @@ class IndexStateManagementIndices(
const val HISTORY_WRITE_INDEX_ALIAS = "$HISTORY_INDEX_BASE-write"
const val HISTORY_INDEX_PATTERN = "<$HISTORY_INDEX_BASE-{now/d{yyyy.MM.dd}}-1>"
const val HISTORY_ALL = "$HISTORY_INDEX_BASE*"

val indexStateManagementMappings = javaClass.classLoader.getResource("mappings/opendistro-ism-config.json").readText()
val indexStateManagementHistoryMappings = javaClass.classLoader.getResource("mappings/opendistro-ism-history.json").readText()
}
}
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.model

import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.instant
import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.optionalTimeField
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IndexUtils
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.WITH_TYPE
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.ToXContentObject
@@ -97,7 +98,7 @@ data class Policy(
var defaultState: String? = null
var errorNotification: ErrorNotification? = null
var lastUpdatedTime: Instant? = null
var schemaVersion: Long = 1
var schemaVersion: Long = IndexUtils.DEFAULT_SCHEMA_VERSION
val states: MutableList<State> = mutableListOf()

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
Original file line number Diff line number Diff line change
@@ -26,6 +26,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.coordina
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FAILED_INDICES
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FAILURES
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FailedIndex
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IndexUtils
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.UPDATED_INDICES
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.isSafeToChange
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.updateManagedIndexRequest
@@ -40,6 +41,7 @@ import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.search.SearchRequest
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.action.support.IndicesOptions
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.Strings
@@ -107,6 +109,7 @@ class RestChangePolicyAction(
private val managedIndexUuids = mutableListOf<Pair<String, String>>()
private val indexUuidToCurrentState = mutableMapOf<String, String>()
lateinit var policy: Policy
lateinit var response: GetResponse

fun start() {
val getRequest = GetRequest(INDEX_STATE_MANAGEMENT_INDEX, changePolicy.policyID)
@@ -118,7 +121,19 @@ class RestChangePolicyAction(
if (!response.isExists || response.isSourceEmpty) {
return channel.sendResponse(BytesRestResponse(RestStatus.NOT_FOUND, "Could not find policy=${changePolicy.policyID}"))
}
this.response = response
IndexUtils.checkAndUpdateConfigIndexMapping(
clusterService.state(),
client.admin().indices(),
ActionListener.wrap(::onUpdateMapping, ::onFailure)
)
}

private fun onUpdateMapping(acknowledgedResponse: AcknowledgedResponse) {
if (!acknowledgedResponse.isAcknowledged) {
return channel.sendResponse(BytesRestResponse(RestStatus.FAILED_DEPENDENCY,
"Could not update $INDEX_STATE_MANAGEMENT_INDEX with new mapping."))
}
policy = XContentHelper.createParser(
channel.request().xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy.C
import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings.Companion.ALLOW_LIST
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IF_PRIMARY_TERM
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IF_SEQ_NO
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.IndexUtils
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.REFRESH
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util._ID
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util._PRIMARY_TERM
@@ -32,10 +33,10 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getDisall
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.DocWriteRequest
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse
import org.elasticsearch.action.index.IndexRequest
import org.elasticsearch.action.index.IndexResponse
import org.elasticsearch.action.support.WriteRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.node.NodeClient
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.settings.Settings
@@ -118,19 +119,15 @@ class RestIndexPolicyAction(
) : AsyncActionHandler(client, channel) {

fun start() {
if (!ismIndices.indexStateManagementIndexExists()) {
ismIndices.initIndexStateManagementIndex(ActionListener.wrap(::onCreateMappingsResponse, ::onFailure))
} else {
putPolicy()
}
ismIndices.checkAndUpdateISMConfigIndex(ActionListener.wrap(::onCreateMappingsResponse, ::onFailure))
}

private fun onCreateMappingsResponse(response: CreateIndexResponse) {
private fun onCreateMappingsResponse(response: AcknowledgedResponse) {
if (response.isAcknowledged) {
log.info("Created $INDEX_STATE_MANAGEMENT_INDEX with mappings.")
log.info("Successfully created or updated $INDEX_STATE_MANAGEMENT_INDEX with newest mappings.")
putPolicy()
} else {
log.error("Create $INDEX_STATE_MANAGEMENT_INDEX mappings call not acknowledged.")
log.error("Unable to create or update $INDEX_STATE_MANAGEMENT_INDEX with newest mapping.")
channel.sendResponse(
BytesRestResponse(
RestStatus.INTERNAL_SERVER_ERROR,
@@ -140,6 +137,8 @@ class RestIndexPolicyAction(
}

private fun putPolicy() {
newPolicy.copy(schemaVersion = IndexUtils.indexManagementSchemaVersion)

val indexRequest = IndexRequest(INDEX_STATE_MANAGEMENT_INDEX)
.setRefreshPolicy(refreshPolicy)
.source(newPolicy.toXContent(channel.newBuilder()))
Original file line number Diff line number Diff line change
@@ -154,7 +154,12 @@ class RestRetryFailedManagedIndexAction(
Pair(Index(managedIndexMetaData.index, managedIndexMetaData.indexUuid), managedIndexMetaData.copy(
stepMetaData = null,
policyRetryInfo = PolicyRetryInfoMetaData(false, 0),
actionMetaData = managedIndexMetaData.actionMetaData?.copy(failed = false, consumedRetries = 0, lastRetryTime = null, startTime = null),
actionMetaData = managedIndexMetaData.actionMetaData?.copy(
failed = false,
consumedRetries = 0,
lastRetryTime = null,
startTime = null
),
transitionTo = startState,
info = mapOf("message" to "Attempting to retry")
))
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.util

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementIndices
import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.IndicesAdminClient
import org.elasticsearch.cluster.ClusterState
import org.elasticsearch.cluster.metadata.IndexMetaData
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentType

class IndexUtils {
companion object {
@Suppress("ObjectPropertyNaming")
const val _META = "_meta"
const val SCHEMA_VERSION = "schema_version"
const val DEFAULT_SCHEMA_VERSION = 1L
val logger = LogManager.getLogger(IndexUtils::class.java)

var indexManagementSchemaVersion: Long
private set
var indexManagementHistoryVersion: Long
private set

init {
indexManagementSchemaVersion = getSchemaVersion(IndexStateManagementIndices.indexStateManagementMappings)
indexManagementHistoryVersion = getSchemaVersion(IndexStateManagementIndices.indexStateManagementHistoryMappings)
}

fun getSchemaVersion(mapping: String): Long {
val xcp = XContentType.JSON.xContent().createParser(NamedXContentRegistry.EMPTY,
LoggingDeprecationHandler.INSTANCE, mapping)

while (!xcp.isClosed) {
val token = xcp.currentToken()
if (token != null && token != XContentParser.Token.END_OBJECT && token != XContentParser.Token.START_OBJECT) {
if (xcp.currentName() != _META) {
xcp.nextToken()
xcp.skipChildren()
} else {
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
when (xcp.currentName()) {
SCHEMA_VERSION -> {
val version = xcp.longValue()
require(version > -1)
return version
}
else -> xcp.nextToken()
}
}
}
}
xcp.nextToken()
}
return DEFAULT_SCHEMA_VERSION
}

fun shouldUpdateIndex(index: IndexMetaData, newVersion: Long): Boolean {
var oldVersion = DEFAULT_SCHEMA_VERSION

val indexMapping = index.mapping()?.sourceAsMap()
if (indexMapping != null && indexMapping.containsKey(_META) && indexMapping[_META] is HashMap<*, *>) {
val metaData = indexMapping[_META] as HashMap<*, *>
if (metaData.containsKey(SCHEMA_VERSION)) {
oldVersion = (metaData[SCHEMA_VERSION] as Int).toLong()
}
}
return newVersion > oldVersion
}

fun checkAndUpdateConfigIndexMapping(
clusterState: ClusterState,
client: IndicesAdminClient,
actionListener: ActionListener<AcknowledgedResponse>
) {
checkAndUpdateIndexMapping(
IndexStateManagementPlugin.INDEX_STATE_MANAGEMENT_INDEX,
indexManagementSchemaVersion,
IndexStateManagementIndices.indexStateManagementMappings,
clusterState,
client,
actionListener
)
}

@OpenForTesting
fun checkAndUpdateIndexMapping(
index: String,
schemaVersion: Long,
mapping: String,
clusterState: ClusterState,
client: IndicesAdminClient,
actionListener: ActionListener<AcknowledgedResponse>
) {
if (clusterState.metaData.indices.containsKey(index)) {
if (shouldUpdateIndex(clusterState.metaData.indices[index], schemaVersion)) {
val putMappingRequest: PutMappingRequest = PutMappingRequest(index).type(_DOC).source(mapping, XContentType.JSON)
client.putMapping(putMappingRequest, actionListener)
} else {
actionListener.onResponse(AcknowledgedResponse(true))
}
} else {
logger.error("IndexMetaData does not exist for $index")
actionListener.onResponse(AcknowledgedResponse(false))
}
}
}
}
Original file line number Diff line number Diff line change
@@ -13,8 +13,7 @@
* permissions and limitations under the License.
*/

@file:Suppress("TopLevelPropertyNaming")

@file:Suppress("TopLevelPropertyNaming", "MatchingDeclarationName")
package com.amazon.opendistroforelasticsearch.indexstatemanagement.util

import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.optionalTimeField
@@ -38,9 +37,6 @@ const val REFRESH = "refresh"
const val WITH_TYPE = "with_type"
val XCONTENT_WITHOUT_TYPE = ToXContent.MapParams(mapOf(WITH_TYPE to "false"))

const val SCHEMA_VERSION = "schema_version"
const val NO_SCHEMA_VERSION = 0

const val FAILURES = "failures"
const val FAILED_INDICES = "failed_indices"
const val UPDATED_INDICES = "updated_indices"
3 changes: 3 additions & 0 deletions src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
{
"_meta" : {
"schema_version": 2
},
"dynamic": "strict",
"properties": {
"policy": {
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package com.amazon.opendistroforelasticsearch.indexstatemanagement

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementIndices.Companion.indexStateManagementMappings
import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.INDEX_STATE_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementPlugin.Companion.POLICY_BASE_URI
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ChangePolicy
import com.amazon.opendistroforelasticsearch.indexstatemanagement.resthandler.RestChangePolicyAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FAILED_INDICES
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.FAILURES
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.UPDATED_INDICES
import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.rest.RestRequest
import org.elasticsearch.test.ESTestCase
import java.util.Locale

class IndexStateManagementIndicesIT : IndexStateManagementRestTestCase() {

private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT)

fun `test create index management`() {
val policy = randomPolicy()
val policyId = ESTestCase.randomAlphaOfLength(10)
client().makeRequest("PUT", "$POLICY_BASE_URI/$policyId", emptyMap(), policy.toHttpEntity())
assertIndexExists(INDEX_STATE_MANAGEMENT_INDEX)
verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 2)
}

fun `test update management index mapping with new schema version`() {
assertIndexDoesNotExist(INDEX_STATE_MANAGEMENT_INDEX)

val mapping = indexStateManagementMappings.trimStart('{').trimEnd('}')
.replace("\"schema_version\": 2", "\"schema_version\": 0")

createIndex(INDEX_STATE_MANAGEMENT_INDEX, Settings.EMPTY, mapping)
assertIndexExists(INDEX_STATE_MANAGEMENT_INDEX)
verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 0)
client().makeRequest("DELETE", "*")

val policy = randomPolicy()
val policyId = ESTestCase.randomAlphaOfLength(10)
client().makeRequest("PUT", "$POLICY_BASE_URI/$policyId", emptyMap(), policy.toHttpEntity())

assertIndexExists(INDEX_STATE_MANAGEMENT_INDEX)
verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 2)
}

fun `test changing policy on an index that hasn't initialized yet check schema version`() {
val policy = createRandomPolicy()
val newPolicy = createPolicy(randomPolicy(), "new_policy", true)
val indexName = "${testIndexName}_computer"
val (index) = createIndex(indexName, policy.id)

val managedIndexConfig = getExistingManagedIndexConfig(index)
assertNull("Change policy is not null", managedIndexConfig.changePolicy)
assertNull("Policy has already initialized", managedIndexConfig.policy)
assertEquals("Policy id does not match", policy.id, managedIndexConfig.policyID)

val mapping = "{" + indexStateManagementMappings.trimStart('{').trimEnd('}')
.replace("\"schema_version\": 2", "\"schema_version\": 0")

val entity = StringEntity(mapping, ContentType.APPLICATION_JSON)
client().makeRequest(RestRequest.Method.PUT.toString(),
"/$INDEX_STATE_MANAGEMENT_INDEX/_mapping", emptyMap(), entity)

verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 0)

// if we try to change policy now, it'll have no ManagedIndexMetaData yet and should succeed
val changePolicy = ChangePolicy(newPolicy.id, null, emptyList(), false)
val response = client().makeRequest(
RestRequest.Method.POST.toString(),
"${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$index", emptyMap(), changePolicy.toHttpEntity())

verifyIndexSchemaVersion(INDEX_STATE_MANAGEMENT_INDEX, 2)

assertAffectedIndicesResponseIsEqual(mapOf(FAILURES to false, FAILED_INDICES to emptyList<Any>(), UPDATED_INDICES to 1), response.asMap())

waitFor { assertEquals(newPolicy.id, getManagedIndexConfig(index)?.changePolicy?.policyID) }
}
}
Original file line number Diff line number Diff line change
@@ -44,6 +44,7 @@ import org.apache.http.message.BasicHeader
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.client.Request
import org.elasticsearch.client.Response
import org.elasticsearch.client.RestClient
import org.elasticsearch.cluster.metadata.IndexMetaData
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.unit.TimeValue
@@ -508,4 +509,38 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() {
}
return true
}

protected fun assertIndexExists(index: String) {
val response = client().makeRequest("HEAD", index)
assertEquals("Index $index does not exist.", RestStatus.OK, response.restStatus())
}

protected fun assertIndexDoesNotExist(index: String) {
val response = client().makeRequest("HEAD", index)
assertEquals("Index $index does exist.", RestStatus.NOT_FOUND, response.restStatus())
}

protected fun verifyIndexSchemaVersion(index: String, expectedVersion: Int) {
val indexMapping = client().getIndexMapping(index)
val indexName = indexMapping.keys.toList()[0]
val mappings = indexMapping.stringMap(indexName)?.stringMap("mappings")
var version = 0
if (mappings!!.containsKey("_meta")) {
val meta = mappings.stringMap("_meta")
if (meta!!.containsKey("schema_version")) version = meta.get("schema_version") as Int
}
assertEquals(expectedVersion, version)
}

@Suppress("UNCHECKED_CAST")
fun Map<String, Any>.stringMap(key: String): Map<String, Any>? {
val map = this as Map<String, Map<String, Any>>
return map[key]
}

fun RestClient.getIndexMapping(index: String): Map<String, Any> {
val response = this.makeRequest("GET", "$index/_mapping")
assertEquals(RestStatus.OK, response.restStatus())
return response.asMap()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package com.amazon.opendistroforelasticsearch.indexstatemanagement.util

import org.elasticsearch.cluster.metadata.IndexMetaData
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.test.ESTestCase
import kotlin.test.assertFailsWith

class IndexUtilsTests : ESTestCase() {

fun `test get schema version`() {
val message = "{\"user\":{ \"name\":\"test\"},\"_meta\":{\"schema_version\": 1}}"

val schemaVersion = IndexUtils.getSchemaVersion(message)
assertEquals(1, schemaVersion)
}

fun `test get schema version without _meta`() {
val message = "{\"user\":{ \"name\":\"test\"}}"

val schemaVersion = IndexUtils.getSchemaVersion(message)
assertEquals(1, schemaVersion)
}

fun `test get schema version without schema_version`() {
val message = "{\"user\":{ \"name\":\"test\"},\"_meta\":{\"test\": 1}}"

val schemaVersion = IndexUtils.getSchemaVersion(message)
assertEquals(1, schemaVersion)
}

fun `test get schema version with negative schema_version`() {
val message = "{\"user\":{ \"name\":\"test\"},\"_meta\":{\"schema_version\": -1}}"

assertFailsWith(IllegalArgumentException::class, "Expected IllegalArgumentException") {
IndexUtils.getSchemaVersion(message)
}
}

fun `test get schema version with wrong schema_version`() {
val message = "{\"user\":{ \"name\":\"test\"},\"_meta\":{\"schema_version\": \"wrong\"}}"

assertFailsWith(IllegalArgumentException::class, "Expected IllegalArgumentException") {
IndexUtils.getSchemaVersion(message)
}
}

fun `test should update index without original version`() {
val indexContent = "{\"testIndex\":{\"settings\":{\"index\":{\"creation_date\":\"1558407515699\"," +
"\"number_of_shards\":\"1\",\"number_of_replicas\":\"1\",\"uuid\":\"t-VBBW6aR6KpJ3XP5iISOA\"," +
"\"version\":{\"created\":\"6040399\"},\"provided_name\":\"data_test\"}},\"mapping_version\":123," +
"\"settings_version\":123,\"mappings\":{\"_doc\":{\"properties\":{\"name\":{\"type\":\"keyword\"}}}}}}"

val parser = createParser(XContentType.JSON.xContent(), indexContent)
val index: IndexMetaData = IndexMetaData.fromXContent(parser)

val shouldUpdateIndex = IndexUtils.shouldUpdateIndex(index, 10)
assertTrue(shouldUpdateIndex)
}

fun `test should update index with lagged version`() {
val indexContent = "{\"testIndex\":{\"settings\":{\"index\":{\"creation_date\":\"1558407515699\"," +
"\"number_of_shards\":\"1\",\"number_of_replicas\":\"1\",\"uuid\":\"t-VBBW6aR6KpJ3XP5iISOA\"," +
"\"version\":{\"created\":\"6040399\"},\"provided_name\":\"data_test\"}},\"mapping_version\":123," +
"\"settings_version\":123,\"mappings\":{\"_doc\":{\"_meta\":{\"schema_version\":1},\"properties\":" +
"{\"name\":{\"type\":\"keyword\"}}}}}}"

val parser = createParser(XContentType.JSON.xContent(), indexContent)
val index: IndexMetaData = IndexMetaData.fromXContent(parser)

val shouldUpdateIndex = IndexUtils.shouldUpdateIndex(index, 10)
assertTrue(shouldUpdateIndex)
}

fun `test should update index with same version`() {
val indexContent = "{\"testIndex\":{\"settings\":{\"index\":{\"creation_date\":\"1558407515699\"," +
"\"number_of_shards\":\"1\",\"number_of_replicas\":\"1\",\"uuid\":\"t-VBBW6aR6KpJ3XP5iISOA\"," +
"\"version\":{\"created\":\"6040399\"},\"provided_name\":\"data_test\"}},\"mappings\":" +
"{\"_doc\":{\"_meta\":{\"schema_version\":1},\"properties\":{\"name\":{\"type\":\"keyword\"}}}}}}"

val parser = createParser(XContentType.JSON.xContent(), indexContent)
val index: IndexMetaData = IndexMetaData.fromXContent(parser)

val shouldUpdateIndex = IndexUtils.shouldUpdateIndex(index, 1)
assertFalse(shouldUpdateIndex)
}
}

0 comments on commit 5f22b5b

Please sign in to comment.