From f05b774644d66a33f86cb915bf7aad1edda6a8d9 Mon Sep 17 00:00:00 2001 From: Drew Baugher <46505179+dbbaughe@users.noreply.github.com> Date: Tue, 14 Jul 2020 13:48:02 -0700 Subject: [PATCH] Adds support for multi-node run/testing and updates tests (#254) * Adds support for multi-node run/testing and updates tests * Remove waitFors around change policy API in tests * Revert debug system property --- README.md | 8 +- build.gradle | 38 ++++- .../IndexStateManagementRestTestCase.kt | 64 ++++---- .../action/ActionRetryIT.kt | 5 +- .../action/ForceMergeActionIT.kt | 20 +-- .../action/SnapshotActionIT.kt | 13 +- .../coordinator/ManagedIndexCoordinatorIT.kt | 14 +- .../resthandler/RestChangePolicyActionIT.kt | 140 +++++++++--------- .../runner/ManagedIndexRunnerIT.kt | 4 +- 9 files changed, 183 insertions(+), 123 deletions(-) diff --git a/README.md b/README.md index 84683a075..62180cd6e 100644 --- a/README.md +++ b/README.md @@ -77,9 +77,11 @@ This project currently uses the Notification subproject from the [Alerting plugi 1. `./gradlew build` builds and tests project. 2. `./gradlew run` launches a single node cluster with the index management (and job-scheduler) plugin installed. -3. `./gradlew integTest` launches a single node cluster with the index management (and job-scheduler) plugin installed and runs all integ tests. -4. `./gradlew integTest -Dtests.class=*RestChangePolicyActionIT` runs a single integ class -5. `./gradlew integTest -Dtests.class=*RestChangePolicyActionIT -Dtests.method="test missing index"` runs a single integ test method (remember to quote the test method name if it contains spaces) +3. `./gradlew run -PnumNodes=3` launches a multi-node cluster with the index management (and job-scheduler) plugin installed. +4. `./gradlew integTest` launches a single node cluster with the index management (and job-scheduler) plugin installed and runs all integ tests. +5. `./gradlew integTest -PnumNodes=3` launches a multi-node cluster with the index management (and job-scheduler) plugin installed and runs all integ tests. +6. `./gradlew integTest -Dtests.class=*RestChangePolicyActionIT` runs a single integ class +7. `./gradlew integTest -Dtests.class=*RestChangePolicyActionIT -Dtests.method="test missing index"` runs a single integ test method (remember to quote the test method name if it contains spaces) When launching a cluster using one of the above commands, logs are placed in `build/testclusters/integTest-0/logs`. Though the logs are teed to the console, in practices it's best to check the actual log file. diff --git a/build.gradle b/build.gradle index 599b29baa..689ae3b74 100644 --- a/build.gradle +++ b/build.gradle @@ -48,7 +48,11 @@ apply plugin: 'elasticsearch.testclusters' apply plugin: 'io.gitlab.arturbosch.detekt' apply plugin: 'org.jetbrains.kotlin.jvm' apply plugin: 'org.jetbrains.kotlin.plugin.allopen' -if (!System.properties.containsKey('tests.rest.cluster') && !System.properties.containsKey('tests.cluster')) { + +def usingRemoteCluster = System.properties.containsKey('tests.rest.cluster') || System.properties.containsKey('tests.cluster') +def usingMultiNode = project.properties.containsKey('numNodes') +// Only apply jacoco test coverage if we are running a local single node cluster +if (!usingRemoteCluster && !usingMultiNode) { apply from: 'build-tools/esplugin-coverage.gradle' } check.dependsOn jacocoTestReport @@ -143,21 +147,41 @@ test { systemProperty 'tests.security.manager', 'false' } +File repo = file("$buildDir/testclusters/repo") +def _numNodes = findProperty('numNodes') as Integer ?: 1 testClusters.integTest { testDistribution = "OSS" + // Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1 + if (_numNodes > 1) numberOfNodes = _numNodes + // When running integration tests it doesn't forward the --debug-jvm to the cluster anymore + // i.e. we have to use a custom property to flag when we want to debug elasticsearch JVM + // since we also support multi node integration tests we increase debugPort per node if (System.getProperty("cluster.debug") != null) { - jvmArgs('-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=5005') + def debugPort = 5005 + nodes.forEach { node -> + node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=*:${debugPort}") + debugPort += 1 + } } plugin(fileTree("src/test/resources/job-scheduler").getSingleFile()) + setting 'path.repo', repo.absolutePath } integTest.runner { systemProperty 'tests.security.manager', 'false' systemProperty 'java.io.tmpdir', es_tmp_dir.absolutePath + systemProperty 'tests.path.repo', repo.absolutePath // Tell the test JVM if the cluster JVM is running under a debugger so that tests can use longer timeouts for // requests. The 'doFirst' delays reading the debug setting on the cluster till execution time. doFirst { systemProperty 'cluster.debug', getDebug() + // Set number of nodes system property to be used in tests + systemProperty 'cluster.number_of_nodes', "${_numNodes}" + // There seems to be an issue when running multi node run or integ tasks with unicast_hosts + // not being written, the waitForAllConditions ensures it's written + getClusters().forEach { cluster -> + cluster.waitForAllConditions() + } } // The -Dcluster.debug option makes the cluster debuggable; this makes the tests debuggable @@ -166,6 +190,16 @@ integTest.runner { } } +run { + doFirst { + // There seems to be an issue when running multi node run or integ tasks with unicast_hosts + // not being written, the waitForAllConditions ensures it's written + getClusters().forEach { cluster -> + cluster.waitForAllConditions() + } + } +} + task ktlint(type: JavaExec, group: "verification") { description = "Check Kotlin code style." main = "com.pinterest.ktlint.Main" diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt index c2e556b50..14454922d 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -42,6 +42,7 @@ import org.apache.http.entity.ContentType.APPLICATION_JSON import org.apache.http.entity.StringEntity import org.apache.http.message.BasicHeader import org.elasticsearch.ElasticsearchParseException +import org.elasticsearch.action.get.GetResponse import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.client.Request import org.elasticsearch.client.Response @@ -79,6 +80,7 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() { private val isDebuggingTest = DisableOnDebug(null).isDebugging private val isDebuggingRemoteCluster = System.getProperty("cluster.debug", "false")!!.toBoolean() + private val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1 fun Response.asMap(): Map = entityAsMap(this) @@ -251,6 +253,17 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() { } } + protected fun getManagedIndexConfigByDocId(id: String): ManagedIndexConfig? { + val response = client().makeRequest("GET", "$INDEX_STATE_MANAGEMENT_INDEX/_doc/$id") + assertEquals("Request failed", RestStatus.OK, response.restStatus()) + val getResponse = GetResponse.fromXContent(createParser(jsonXContent, response.entity.content)) + assertTrue("Did not find managed index config", getResponse.isExists) + return getResponse?.run { + val xcp = createParser(jsonXContent, sourceAsBytesRef) + ManagedIndexConfig.parseWithType(xcp, id, seqNo, primaryTerm) + } + } + @Suppress("UNCHECKED_CAST") protected fun getHistorySearchResponse(index: String): SearchResponse { val request = """ @@ -293,7 +306,8 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() { val intervalSchedule = (update.jobSchedule as IntervalSchedule) val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis() val startTimeMillis = desiredStartTimeMillis ?: Instant.now().toEpochMilli() - millis - val response = client().makeRequest("POST", "$INDEX_STATE_MANAGEMENT_INDEX/_update/${update.id}", + val waitForActiveShards = if (isMultiNode) "all" else "1" + val response = client().makeRequest("POST", "$INDEX_STATE_MANAGEMENT_INDEX/_update/${update.id}?wait_for_active_shards=$waitForActiveShards", StringEntity( "{\"doc\":{\"managed_index\":{\"schedule\":{\"interval\":{\"start_time\":" + "\"$startTimeMillis\"}}}}}", @@ -339,23 +353,31 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() { } } + // Validate segment count per shard by specifying the min and max it should be @Suppress("UNCHECKED_CAST") - protected fun getSegmentCount(index: String): Int { - val statsResponse: Map = getStats(index) - - // Assert that shard count of stats response is 1 since the stats request being used is at the index level - // (meaning the segment count in the response is aggregated) but segment count for force merge - // (which this method is primarily being used for) is going to be validated per shard - val shardsInfo = statsResponse["_shards"] as Map - assertEquals("Shard count higher than expected", 1, shardsInfo["successful"]) - - val indicesStats = statsResponse["indices"] as Map>>> - return indicesStats[index]!!["primaries"]!!["segments"]!!["count"] as Int + protected fun validateSegmentCount(index: String, min: Int? = null, max: Int? = null): Boolean { + if (min == null && max == null) throw IllegalArgumentException("Must provide at least a min or max") + val statsResponse: Map = getShardSegmentStats(index) + + val indicesStats = statsResponse["indices"] as Map>>>>> + return indicesStats[index]!!["shards"]!!.values.all { list -> + list.filter { it["routing"]!!["primary"] == true }.all { + logger.info("Checking primary shard segments for $it") + if (it["routing"]!!["state"] != "STARTED") { + false + } else { + val count = it["segments"]!!["count"] as Int + if (min != null && count < min) return false + if (max != null && count > max) return false + return true + } + } + } } - /** Get stats for [index] */ - private fun getStats(index: String): Map { - val response = client().makeRequest("GET", "/$index/_stats") + /** Get shard segment stats for [index] */ + private fun getShardSegmentStats(index: String): Map { + val response = client().makeRequest("GET", "/$index/_stats/segments?level=shards") assertEquals("Stats request failed", RestStatus.OK, response.restStatus()) @@ -440,17 +462,7 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() { assertEquals("Unable to create a new repository", RestStatus.OK, response.restStatus()) } - @Suppress("UNCHECKED_CAST") - private fun getRepoPath(): String { - val response = client() - .makeRequest( - "GET", - "_nodes", - emptyMap() - ) - assertEquals("Unable to get a nodes settings", RestStatus.OK, response.restStatus()) - return ((response.asMap()["nodes"] as HashMap>>>).values.first()["settings"]!!["path"]!!["repo"] as List)[0] - } + private fun getRepoPath(): String = System.getProperty("tests.path.repo") private fun getSnapshotsList(repository: String): List { val response = client() diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionRetryIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionRetryIT.kt index c974e95c6..059a976db 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionRetryIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ActionRetryIT.kt @@ -123,11 +123,12 @@ class ActionRetryIT : IndexStateManagementRestTestCase() { // Second execution is to fail the step once. updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) + + waitFor { assertEquals(1, getExplainManagedIndexMetaData(indexName).actionMetaData?.consumedRetries) } // Third execution should not run job since we have the retry backoff. updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) + Thread.sleep(5000) // currently there is nothing to compare when backing off so we have to sleep // Fourth execution should not run job since we have the retry backoff. updateManagedIndexConfigStartTime(managedIndexConfig) diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ForceMergeActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ForceMergeActionIT.kt index 65121cf60..d153b30b7 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ForceMergeActionIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/ForceMergeActionIT.kt @@ -55,7 +55,7 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Add sample data to increase segment count, passing in a delay to ensure multiple segments get created insertSampleData(indexName, 3, 1000) - waitFor { assertTrue("Segment count for [$indexName] was less than expected", getSegmentCount(indexName) > 1) } + waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) } val managedIndexConfig = getExistingManagedIndexConfig(indexName) @@ -72,7 +72,7 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Third execution: Force merge operation is kicked off updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) + // verify we set maxNumSegments in action properties when kicking off force merge waitFor { assertEquals( @@ -84,9 +84,8 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Fourth execution: Waits for force merge to complete, which will happen in this execution since index is small updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) - waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) } + waitFor { assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) } // verify we reset actionproperties at end of forcemerge waitFor { assertNull("maxNumSegments was not reset", getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties) } // index should still be readonly after force merge finishes @@ -117,7 +116,7 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Add sample data to increase segment count, passing in a delay to ensure multiple segments get created insertSampleData(indexName, 3, 1000) - waitFor { assertTrue("Segment count for [$indexName] was less than expected", getSegmentCount(indexName) > 1) } + waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) } // Set index to read-only updateIndexSettings(indexName, Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true)) @@ -133,17 +132,20 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Second execution: Index was already read-only and should remain so for force_merge updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals("Set index to read-only", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } // Third execution: Force merge operation is kicked off updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) + + waitFor { assertEquals("Started force merge", getExplainManagedIndexMetaData(indexName).info?.get("message")) } // Fourth execution: Waits for force merge to complete, which will happen in this execution since index is small updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) - waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) } - waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } + waitFor { assertEquals("Force merge completed", getExplainManagedIndexMetaData(indexName).info?.get("message")) } + assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) + assertEquals("true", getIndexBlocksWriteSetting(indexName)) } } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/SnapshotActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/SnapshotActionIT.kt index f460f3649..094b299a3 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/SnapshotActionIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/SnapshotActionIT.kt @@ -30,8 +30,8 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() { private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT) fun `test basic`() { - val indexName = "${testIndexName}_index_1" - val policyID = "${testIndexName}_testPolicyName_1" + val indexName = "${testIndexName}_index_basic" + val policyID = "${testIndexName}_policy_basic" val repository = "repository" val snapshot = "snapshot" val actionConfig = SnapshotActionConfig(repository, snapshot, 0) @@ -62,15 +62,14 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() { // Need to wait two cycles for wait for snapshot step updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) waitFor { assertSnapshotExists(repository, snapshot) } waitFor { assertSnapshotFinishedWithSuccess(repository, snapshot) } } fun `test successful wait for snapshot step`() { - val indexName = "${testIndexName}_index_1" - val policyID = "${testIndexName}_testPolicyName_1" + val indexName = "${testIndexName}_index_success" + val policyID = "${testIndexName}_policy_success" val repository = "repository" val snapshot = "snapshot_success_test" val actionConfig = SnapshotActionConfig(repository, snapshot, 0) @@ -118,8 +117,8 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() { } fun `test failed wait for snapshot step`() { - val indexName = "${testIndexName}_index_1" - val policyID = "${testIndexName}_testPolicyName_1" + val indexName = "${testIndexName}_index_failed" + val policyID = "${testIndexName}_policy_failed" val repository = "repository" val snapshot = "snapshot_failed_test" val actionConfig = SnapshotActionConfig(repository, snapshot, 0) diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt index bd25c24dc..04bc57db3 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt @@ -168,8 +168,9 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { // Confirm job was disabled val disabledManagedIndexConfig: ManagedIndexConfig = waitFor { - val config = getExistingManagedIndexConfig(indexName) - assertEquals("ManagedIndexConfig was not disabled", false, config.enabled) + val config = getManagedIndexConfigByDocId(managedIndexConfig.id) + assertNotNull("Could not find ManagedIndexConfig", config) + assertEquals("ManagedIndexConfig was not disabled", false, config!!.enabled) config } @@ -239,7 +240,7 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { // Add sample data to increase segment count, passing in a delay to ensure multiple segments get created insertSampleData(indexName, 3, 1000) - waitFor { assertTrue("Segment count for [$indexName] was less than expected", getSegmentCount(indexName) > 1) } + waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) } val managedIndexConfig = getExistingManagedIndexConfig(indexName) @@ -258,7 +259,6 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { // Third execution: Force merge operation is kicked off updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) // Verify maxNumSegments is set in action properties when kicking off force merge waitFor { @@ -274,13 +274,15 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { // Fourth execution: WaitForForceMergeStep is not safe to disable on, so the job should not disable yet updateManagedIndexConfigStartTime(managedIndexConfig) - Thread.sleep(3000) + + // Confirm we successfully executed the WaitForForceMergeStep + waitFor { assertEquals("Force merge completed", getExplainManagedIndexMetaData(indexName).info?.get("message")) } // Confirm job was not disabled assertEquals("ManagedIndexConfig was disabled early", true, getExistingManagedIndexConfig(indexName).enabled) // Validate segments were merged - waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) } + assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) // Fifth execution: Attempt transition, which is safe to disable on, so job should be disabled updateManagedIndexConfigStartTime(managedIndexConfig) diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt index f4aa5af61..a246ea1f3 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/resthandler/RestChangePolicyActionIT.kt @@ -200,15 +200,15 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { assertAffectedIndicesResponseIsEqual(mapOf(FAILURES to false, FAILED_INDICES to emptyList(), UPDATED_INDICES to 1), response.asMap()) - waitFor { assertEquals(newPolicy.id, getManagedIndexConfig(index)?.changePolicy?.policyID) } + waitFor { assertEquals(newPolicy.id, getManagedIndexConfigByDocId(managedIndexConfig.id)?.changePolicy?.policyID) } // speed up to first execution where we initialize the policy on the job updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(newPolicy.id, getManagedIndexConfig(index)?.policyID) } + waitFor { assertEquals(newPolicy.id, getManagedIndexConfigByDocId(managedIndexConfig.id)?.policyID) } // The initialized policy should be the change policy one - val updatedManagedIndexConfig = getManagedIndexConfig(index) + val updatedManagedIndexConfig = getManagedIndexConfigByDocId(managedIndexConfig.id) assertNotNull("Updated managed index config is null", updatedManagedIndexConfig) assertNull("Updated change policy is not null", updatedManagedIndexConfig!!.changePolicy) assertEquals("Initialized policyId is not the change policy id", newPolicy.id, updatedManagedIndexConfig.policyID) @@ -256,7 +256,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { assertAffectedIndicesResponseIsEqual(expectedResponse, response.asMap()) waitFor { - val updatedManagedIndexConfig = getManagedIndexConfig(index) + val updatedManagedIndexConfig = getManagedIndexConfigByDocId(managedIndexConfig.id) assertNotNull("Updated managed index config is null", updatedManagedIndexConfig) assertNotNull("Updated change policy is null", updatedManagedIndexConfig!!.changePolicy) assertEquals("Updated change policy policy id does not match", newPolicy.id, updatedManagedIndexConfig.changePolicy!!.policyID) @@ -297,7 +297,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // After first execution we should expect the change policy to still be null (since we haven't called it yet) // and the initial policy should of been cached val executedManagedIndexConfig: ManagedIndexConfig = waitFor { - val config = getManagedIndexConfig(index) + val config = getManagedIndexConfigByDocId(managedIndexConfig.id) assertNotNull("Executed managed index config is null", config) assertNull("Executed change policy is not null", config!!.changePolicy) assertNotNull("Executed policy is null", config.policy) @@ -307,26 +307,28 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { } // We should expect the explain API to show an initialized ManagedIndexMetaData with the default state from the initial policy - val explainResponseMap = getExplainMap(index) - assertPredicatesOnMetaData( - listOf( - index to listOf( - ManagedIndexSettings.POLICY_ID.key to policy.id::equals, - ManagedIndexMetaData.INDEX to executedManagedIndexConfig.index::equals, - ManagedIndexMetaData.INDEX_UUID to executedManagedIndexConfig.indexUuid::equals, - ManagedIndexMetaData.POLICY_ID to executedManagedIndexConfig.policyID::equals, - StateMetaData.STATE to fun(stateMetaDataMap: Any?): Boolean = - assertStateEquals(StateMetaData(policy.defaultState, Instant.now().toEpochMilli()), stateMetaDataMap) - ) - ), explainResponseMap, false) + waitFor { + val explainResponseMap = getExplainMap(index) + assertPredicatesOnMetaData( + listOf( + index to listOf( + ManagedIndexSettings.POLICY_ID.key to policy.id::equals, + ManagedIndexMetaData.INDEX to executedManagedIndexConfig.index::equals, + ManagedIndexMetaData.INDEX_UUID to executedManagedIndexConfig.indexUuid::equals, + ManagedIndexMetaData.POLICY_ID to executedManagedIndexConfig.policyID::equals, + StateMetaData.STATE to fun(stateMetaDataMap: Any?): Boolean = + assertStateEquals(StateMetaData(policy.defaultState, Instant.now().toEpochMilli()), stateMetaDataMap) + ) + ), explainResponseMap, false) + } val changePolicy = ChangePolicy(newPolicy.id, null, emptyList(), false) val response = client().makeRequest(RestRequest.Method.POST.toString(), - "${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$index", emptyMap(), changePolicy.toHttpEntity()) + "${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$index", emptyMap(), changePolicy.toHttpEntity()) val expectedResponse = mapOf( - FAILURES to false, - FAILED_INDICES to emptyList(), - UPDATED_INDICES to 1 + FAILURES to false, + FAILED_INDICES to emptyList(), + UPDATED_INDICES to 1 ) assertAffectedIndicesResponseIsEqual(expectedResponse, response.asMap()) @@ -335,7 +337,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { - val config = getManagedIndexConfig(index) + val config = getManagedIndexConfigByDocId(managedIndexConfig.id) assertNotNull("Next managed index config is null", config) assertNotNull("Next change policy is null", config!!.changePolicy) assertNotNull("Next policy is null", config.policy) @@ -346,26 +348,28 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { } // We should expect the explain API to show us in the ReadOnlyAction - assertPredicatesOnMetaData( - listOf( - index to listOf( - ManagedIndexSettings.POLICY_ID.key to policy.id::equals, - ManagedIndexMetaData.INDEX to executedManagedIndexConfig.index::equals, - ManagedIndexMetaData.INDEX_UUID to executedManagedIndexConfig.indexUuid::equals, - ManagedIndexMetaData.POLICY_ID to executedManagedIndexConfig.policyID::equals, - StateMetaData.STATE to fun(stateMetaDataMap: Any?): Boolean = + waitFor { + assertPredicatesOnMetaData( + listOf( + index to listOf( + ManagedIndexSettings.POLICY_ID.key to policy.id::equals, + ManagedIndexMetaData.INDEX to executedManagedIndexConfig.index::equals, + ManagedIndexMetaData.INDEX_UUID to executedManagedIndexConfig.indexUuid::equals, + ManagedIndexMetaData.POLICY_ID to executedManagedIndexConfig.policyID::equals, + StateMetaData.STATE to fun(stateMetaDataMap: Any?): Boolean = assertStateEquals(StateMetaData(policy.defaultState, Instant.now().toEpochMilli()), stateMetaDataMap), - ActionMetaData.ACTION to fun(actionMetaDataMap: Any?): Boolean = + ActionMetaData.ACTION to fun(actionMetaDataMap: Any?): Boolean = assertActionEquals(ActionMetaData(name = ActionConfig.ActionType.READ_ONLY.type, startTime = Instant.now().toEpochMilli(), index = 0, - failed = false, consumedRetries = 0, lastRetryTime = null, actionProperties = null), actionMetaDataMap) - ) - ), getExplainMap(index), false) + failed = false, consumedRetries = 0, lastRetryTime = null, actionProperties = null), actionMetaDataMap) + ) + ), getExplainMap(index), false) + } // speed up to third execution so that we try to move to transitions and trigger a change policy updateManagedIndexConfigStartTime(managedIndexConfig) val changedManagedIndexConfig: ManagedIndexConfig = waitFor { - val config = getManagedIndexConfig(index) + val config = getManagedIndexConfigByDocId(managedIndexConfig.id) assertNotNull("Changed managed index config is null", config) assertNull("Changed change policy is not null", config!!.changePolicy) assertNotNull("Changed policy is null", config.policy) @@ -375,20 +379,22 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { } // We should expect the explain API to show us with the new policy - assertPredicatesOnMetaData( - listOf( - index to listOf( - ManagedIndexSettings.POLICY_ID.key to newPolicy.id::equals, - ManagedIndexMetaData.INDEX to changedManagedIndexConfig.index::equals, - ManagedIndexMetaData.INDEX_UUID to changedManagedIndexConfig.indexUuid::equals, - ManagedIndexMetaData.POLICY_ID to changedManagedIndexConfig.policyID::equals, - StateMetaData.STATE to fun(stateMetaDataMap: Any?): Boolean = + waitFor { + assertPredicatesOnMetaData( + listOf( + index to listOf( + ManagedIndexSettings.POLICY_ID.key to newPolicy.id::equals, + ManagedIndexMetaData.INDEX to changedManagedIndexConfig.index::equals, + ManagedIndexMetaData.INDEX_UUID to changedManagedIndexConfig.indexUuid::equals, + ManagedIndexMetaData.POLICY_ID to changedManagedIndexConfig.policyID::equals, + StateMetaData.STATE to fun(stateMetaDataMap: Any?): Boolean = assertStateEquals(StateMetaData(policy.defaultState, Instant.now().toEpochMilli()), stateMetaDataMap), - ActionMetaData.ACTION to fun(actionMetaDataMap: Any?): Boolean = + ActionMetaData.ACTION to fun(actionMetaDataMap: Any?): Boolean = assertActionEquals(ActionMetaData(name = ActionConfig.ActionType.TRANSITION.type, startTime = Instant.now().toEpochMilli(), index = 0, - failed = false, consumedRetries = 0, lastRetryTime = null, actionProperties = null), actionMetaDataMap) - ) - ), getExplainMap(index), false) + failed = false, consumedRetries = 0, lastRetryTime = null, actionProperties = null), actionMetaDataMap) + ) + ), getExplainMap(index), false) + } } fun `test change policy API should only apply to indices in the state filter`() { @@ -433,7 +439,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { val newPolicy = createRandomPolicy() val changePolicy = ChangePolicy(newPolicy.id, null, listOf(StateFilter(state = firstState.name)), false) val response = client().makeRequest(RestRequest.Method.POST.toString(), - "${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$firstIndex,$secondIndex", emptyMap(), changePolicy.toHttpEntity()) + "${RestChangePolicyAction.CHANGE_POLICY_BASE_URI}/$firstIndex,$secondIndex", emptyMap(), changePolicy.toHttpEntity()) val expectedResponse = mapOf( FAILURES to false, FAILED_INDICES to emptyList(), @@ -443,12 +449,12 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { waitFor { // The first managed index should not have a change policy added to it as it should of been filtered out from the states filter - val nextFirstManagedIndexConfig = getManagedIndexConfig(firstIndex) + val nextFirstManagedIndexConfig = getManagedIndexConfigByDocId(firstManagedIndexConfig.id) assertNotNull("Next first managed index config is null", nextFirstManagedIndexConfig) assertNull("Next first change policy is not null", nextFirstManagedIndexConfig!!.changePolicy) // The second managed index should have a change policy added to it - val nextSecondManagedIndexConfig = getManagedIndexConfig(secondIndex) + val nextSecondManagedIndexConfig = getManagedIndexConfigByDocId(secondManagedIndexConfig.id) assertNotNull("Next second managed index config is null", nextSecondManagedIndexConfig) assertNotNull("Next second change policy is null", nextSecondManagedIndexConfig!!.changePolicy) } @@ -479,7 +485,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { // The initialized policy should be the change policy one val updatedManagedIndexConfig: ManagedIndexConfig = waitFor { - val config = getManagedIndexConfig(index) + val config = getManagedIndexConfigByDocId(managedIndexConfig.id) assertNotNull("Updated managed index config is null", config) assertNull("Updated change policy is not null", config!!.changePolicy) assertEquals("Initialized policyId is not the change policy id", newPolicy.id, config.policyID) @@ -491,15 +497,17 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { } // should expect to see us starting in the state mentioned in changepolicy - assertPredicatesOnMetaData( - listOf( - index to listOf( - ManagedIndexMetaData.INDEX_UUID to updatedManagedIndexConfig.indexUuid::equals, - ManagedIndexMetaData.POLICY_ID to newPolicy.id::equals, - StateMetaData.STATE to fun(stateMetaDataMap: Any?): Boolean = + waitFor { + assertPredicatesOnMetaData( + listOf( + index to listOf( + ManagedIndexMetaData.INDEX_UUID to updatedManagedIndexConfig.indexUuid::equals, + ManagedIndexMetaData.POLICY_ID to newPolicy.id::equals, + StateMetaData.STATE to fun(stateMetaDataMap: Any?): Boolean = assertStateEquals(StateMetaData("some_other_state", Instant.now().toEpochMilli()), stateMetaDataMap) - ) - ), getExplainMap(index), false) + ) + ), getExplainMap(index), false) + } } fun `test allowing change policy to happen in middle of state if same state structure`() { @@ -545,15 +553,15 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { assertAffectedIndicesResponseIsEqual(expectedResponse, response.asMap()) // the change policy REST API should of set safe to true as the policies have the same state/actions - waitFor { assertEquals(true, getManagedIndexConfig(indexName)?.changePolicy?.isSafe) } + waitFor { assertEquals(true, getManagedIndexConfigByDocId(managedIndexConfig.id)?.changePolicy?.isSafe) } // speed up to next execution where we should swap the policy even while in the middle of the // rollover action and fix our minDocs being too high updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { - assertNull(getManagedIndexConfig(indexName)?.changePolicy) - assertEquals(newPolicy.id, getManagedIndexConfig(indexName)?.policyID) + assertNull(getManagedIndexConfigByDocId(managedIndexConfig.id)?.changePolicy) + assertEquals(newPolicy.id, getManagedIndexConfigByDocId(managedIndexConfig.id)?.policyID) assertEquals(newPolicy.id, getExplainManagedIndexMetaData(indexName).policyID) } @@ -596,7 +604,7 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { assertAffectedIndicesResponseIsEqual(expectedResponse, response.asMap()) // the change policy REST API should set the ChangePolicy on the config job - waitFor { assertEquals(policy.id, getManagedIndexConfig(indexName)?.changePolicy?.policyID) } + waitFor { assertEquals(policy.id, getManagedIndexConfigByDocId(managedIndexConfig.id)?.changePolicy?.policyID) } // retry failed index val retryResponse = client().makeRequest( @@ -615,9 +623,9 @@ class RestChangePolicyActionIT : IndexStateManagementRestTestCase() { updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { - assertNull(getManagedIndexConfig(indexName)?.changePolicy) - assertNotNull(getManagedIndexConfig(indexName)?.policy) - assertEquals(policy.id, getManagedIndexConfig(indexName)?.policyID) + assertNull(getManagedIndexConfigByDocId(managedIndexConfig.id)?.changePolicy) + assertNotNull(getManagedIndexConfigByDocId(managedIndexConfig.id)?.policy) + assertEquals(policy.id, getManagedIndexConfigByDocId(managedIndexConfig.id)?.policyID) assertEquals(policy.id, getExplainManagedIndexMetaData(indexName).policyID) assertEquals("Successfully initialized policy: ${policy.id}", getExplainManagedIndexMetaData(indexName).info?.get("message")) } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/runner/ManagedIndexRunnerIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/runner/ManagedIndexRunnerIT.kt index 4c9ce902b..1dc58ea5b 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/runner/ManagedIndexRunnerIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/runner/ManagedIndexRunnerIT.kt @@ -100,14 +100,14 @@ class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() { // init policy updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(createdPolicy.id, getManagedIndexConfig(indexName)?.policyID) } + waitFor { assertEquals(createdPolicy.id, getManagedIndexConfigByDocId(managedIndexConfig.id)?.policyID) } // change cluster job interval setting to 2 (minutes) updateClusterSetting(ManagedIndexSettings.JOB_INTERVAL.key, "2") // fast forward to next execution where at the end we should change the job interval time updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { (getManagedIndexConfig(indexName)?.jobSchedule as? IntervalSchedule)?.interval == 2 } + waitFor { (getManagedIndexConfigByDocId(managedIndexConfig.id)?.jobSchedule as? IntervalSchedule)?.interval == 2 } } fun `test allow list fails execution`() {