Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Backport bug fixes/improvements to opendistro-1.1 #136

Merged
18 changes: 2 additions & 16 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,28 +71,14 @@ detekt {
buildUponDefaultConfig = true
}

configurations.all {
if (it.state != Configuration.State.UNRESOLVED) return
resolutionStrategy {
force "commons-logging:commons-logging:${versions.commonslogging}"
force "org.apache.httpcomponents:httpcore:${versions.httpcore}"
force "commons-codec:commons-codec:${versions.commonscodec}"
}
}

dependencies {
compileOnly "org.elasticsearch:elasticsearch:${es_version}"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.1.0.1"
compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compile "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1'
compile "org.jetbrains:annotations:13.0"
compile "com.amazon.opendistroforelasticsearch:notification:1.1.0.0"
// alerting-notification transitive dependencies
compile "org.apache.httpcomponents:httpcore:4.4.5"
compile "org.apache.httpcomponents:httpclient:4.5.7"
compile "commons-logging:commons-logging:1.2"
compile "commons-codec:commons-codec:1.11"
compile "com.amazon.opendistroforelasticsearch:notification:1.1.0.1"

testCompile "org.elasticsearch.test:framework:${es_version}"
testCompile "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand All @@ -114,7 +100,7 @@ ext {
}

group = "com.amazon.opendistroforelasticsearch"
version = "${opendistroVersion}.1"
version = "${opendistroVersion}.2"

if (isSnapshot) {
version += "-SNAPSHOT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class IndexStateManagementHistory(
val alias = indexMetaData.aliases.firstOrNull { IndexStateManagementIndices.HISTORY_WRITE_INDEX_ALIAS == it.value.alias }
if (alias != null && historyEnabled) {
// If index has write alias and history is enable, don't delete the index.
break
continue
}

indexToDelete.add(indexMetaData.index.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class ManagedIndexCoordinator(
@OpenForTesting
suspend fun sweepClusterChangedEvent(event: ClusterChangedEvent) {
val indicesDeletedRequests = event.indicesDeleted()
.filter { event.previousState().metaData().index(it).getPolicyID() != null }
.filter { event.previousState().metaData().index(it)?.getPolicyID() != null }
.map { deleteManagedIndexRequest(it.uuid) }

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getStartingManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getStateToExecute
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getCompletedManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getUpdatedActionMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.hasDifferentJobInterval
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.hasTimedOut
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.hasVersionConflict
Expand Down Expand Up @@ -233,6 +234,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
val state = policy.getStateToExecute(managedIndexMetaData)
val action: Action? = state?.getActionToExecute(clusterService, scriptService, client, managedIndexMetaData)
val step: Step? = action?.getStepToExecute()
val currentActionMetaData = action?.getUpdatedActionMetaData(managedIndexMetaData, state)

// If Index State Management is disabled and the current step is not null and safe to disable on
// then disable the job and return early
Expand All @@ -241,11 +243,11 @@ object ManagedIndexRunner : ScheduledJobRunner,
return
}

if (action?.hasTimedOut(managedIndexMetaData.actionMetaData) == true) {
if (action?.hasTimedOut(currentActionMetaData) == true) {
val info = mapOf("message" to "Action timed out")
logger.error("Action=${action.type.type} has timed out")
val updated = updateManagedIndexMetaData(managedIndexMetaData
.copy(actionMetaData = managedIndexMetaData.actionMetaData?.copy(failed = true), info = info))
.copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info))
if (updated) disableManagedIndexConfig(managedIndexConfig)
return
}
Expand All @@ -255,7 +257,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
return
}

val shouldBackOff = action?.shouldBackoff(managedIndexMetaData.actionMetaData, action.config.configRetry)
val shouldBackOff = action?.shouldBackoff(currentActionMetaData, action.config.configRetry)
if (shouldBackOff?.first == true) {
// If we should back off then exit early.
logger.info("Backoff for retrying. Remaining time ${shouldBackOff.second}")
Expand All @@ -282,9 +284,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
val startingManagedIndexMetaData = managedIndexMetaData.getStartingManagedIndexMetaData(state, action, step)
val updateResult = updateManagedIndexMetaData(startingManagedIndexMetaData)

val actionMetaData = startingManagedIndexMetaData.actionMetaData

if (updateResult && state != null && action != null && step != null && actionMetaData != null) {
if (updateResult && state != null && action != null && step != null && currentActionMetaData != null) {
// Step null check is done in getStartingManagedIndexMetaData
step.execute()
var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ data class ManagedIndexConfig(
policyID = requireNotNull(policyID) { "ManagedIndexConfig policy id is null" },
policySeqNo = policySeqNo,
policyPrimaryTerm = policyPrimaryTerm,
policy = policy,
policy = policy?.copy(seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO,
primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM),
changePolicy = changePolicy
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,22 +218,28 @@ fun RolloverActionConfig.evaluateConditions(
numDocs: Long,
indexSize: ByteSizeValue
): Boolean {
if (this.minDocs == null &&
this.minAge == null &&
this.minSize == null) {
// If no conditions specified we default to true
return true
}

if (this.minDocs != null) {
return this.minDocs <= numDocs
if (this.minDocs <= numDocs) return true
}

if (this.minAge != null) {
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDate.toEpochMilli()
return this.minAge.millis <= elapsedTime
if (this.minAge.millis <= elapsedTime) return true
}

if (this.minSize != null) {
return this.minSize <= indexSize
if (this.minSize <= indexSize) return true
}

// If no conditions specified we default to true
return true
// return false if non of the conditions were true.
return false
}

fun Policy.getStateToExecute(managedIndexMetaData: ManagedIndexMetaData): State? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,60 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
)
}
}

// https://github.com/opendistro-for-elasticsearch/index-management/issues/130
fun `test action timeout doesn't bleed over into next action`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val testPolicy = """
{"policy":{"description":"Default policy","default_state":"rolloverstate","states":[
{"name":"rolloverstate","actions":[{"timeout": "5s","open":{}},{"timeout":"1s","rollover":{"min_doc_count":100}}],
"transitions":[]}]}}
""".trimIndent()

createPolicyJson(testPolicy, policyID)

createIndex(indexName, policyID, "some_alias")

val managedIndexConfig = getExistingManagedIndexConfig(indexName)
// Change the start time so the job will trigger in 2 seconds.
// First execution. We need to initialize the policy.
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.POLICY_ID to policyID::equals)),
getExplainMap(indexName),
strict = false
)
}

// the second execution we move into open action, we won't hit the timeout as this is the execution that sets the startTime
updateManagedIndexConfigStartTime(managedIndexConfig)

val expectedOpenInfoString = mapOf("message" to "Successfully opened index").toString()
waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedOpenInfoString == info.toString())),
getExplainMap(indexName),
strict = false
)
}

// wait 5 seconds for the timeout from the first action to have passed
Thread.sleep(5000L)

// the third execution we move into rollover action, we should not hit the timeout yet because its the first execution of rollover
// but there was a bug before where it would use the startTime from the previous actions metadata and immediately fail
updateManagedIndexConfigStartTime(managedIndexConfig)

val expectedRolloverInfoString = mapOf("message" to "Attempting to rollover").toString()
waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedRolloverInfoString == info.toString())),
getExplainMap(indexName),
strict = false
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.action

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementRestTestCase
import com.amazon.opendistroforelasticsearch.indexstatemanagement.makeRequest
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification
import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor
import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.elasticsearch.common.unit.ByteSizeUnit
import org.elasticsearch.common.unit.ByteSizeValue
import org.elasticsearch.rest.RestRequest
import org.junit.Assert
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale

class RolloverActionIT : IndexStateManagementRestTestCase() {

private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT)

fun `test rollover no condition`() {
val aliasName = "${testIndexName}_alias"
val indexNameBase = "${testIndexName}_index"
val firstIndex = "$indexNameBase-1"
val policyID = "${testIndexName}_testPolicyName_1"
val actionConfig = RolloverActionConfig(null, null, null, 0)
val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf()))
val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)

createPolicy(policy, policyID)
// create index defaults
createIndex(firstIndex, policyID, aliasName)

val managedIndexConfig = getExistingManagedIndexConfig(firstIndex)

// Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(firstIndex).policyID) }

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) }
Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
}

fun `test rollover multi condition byte size`() {
val aliasName = "${testIndexName}_byte_alias"
val indexNameBase = "${testIndexName}_index_byte"
val firstIndex = "$indexNameBase-1"
val policyID = "${testIndexName}_testPolicyName_byte_1"
val actionConfig = RolloverActionConfig(ByteSizeValue(10, ByteSizeUnit.BYTES), 1000000, null, 0)
val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf()))
val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)

createPolicy(policy, policyID)
// create index defaults
createIndex(firstIndex, policyID, aliasName)

val managedIndexConfig = getExistingManagedIndexConfig(firstIndex)

// Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(firstIndex).policyID) }

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Index rollover before it met the condition.", mapOf("message" to "Attempting to rollover"), getExplainManagedIndexMetaData(firstIndex).info) }

client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/1111",
StringEntity("{ \"testkey\": \"some valueaaaaaaa\" }", ContentType.APPLICATION_JSON)
)
client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/2222",
StringEntity("{ \"testkey1\": \"some value1\" }", ContentType.APPLICATION_JSON)
)
client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/3333",
StringEntity("{ \"testkey2\": \"some value2\" }", ContentType.APPLICATION_JSON)
)

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) }
Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
}

fun `test rollover multi condition doc size`() {
val aliasName = "${testIndexName}_doc_alias"
val indexNameBase = "${testIndexName}_index_doc"
val firstIndex = "$indexNameBase-1"
val policyID = "${testIndexName}_testPolicyName_doc_1"
val actionConfig = RolloverActionConfig(ByteSizeValue(10, ByteSizeUnit.TB), 3, null, 0)
val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf()))
val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)

createPolicy(policy, policyID)
// create index defaults
createIndex(firstIndex, policyID, aliasName)

val managedIndexConfig = getExistingManagedIndexConfig(firstIndex)

// Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(firstIndex).policyID) }

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Index rollover before it met the condition.", mapOf("message" to "Attempting to rollover"), getExplainManagedIndexMetaData(firstIndex).info) }

client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/1111",
StringEntity("{ \"testkey\": \"some value\" }", ContentType.APPLICATION_JSON)
)
client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/2222",
StringEntity("{ \"testkey1\": \"some value1\" }", ContentType.APPLICATION_JSON)
)
client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/3333",
StringEntity("{ \"testkey2\": \"some value2\" }", ContentType.APPLICATION_JSON)
)

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) }
Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
}
}
Binary file not shown.