Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Backports bug fixes/improvements to opendistro-1.0 (#139)
Browse files Browse the repository at this point in the history
* Adds null check in cluster changed event sweep (#125)

* Fixes history index iteration breaking instead of continuing (#134)

* Fixes issue where action timeout was using start_time from previous action (#133)

* Removes transitive dependencies for notification (#105)

* Removes transitive dependencies for notification

* Removes resolution strategy for removed transient dependencies and adds excludes for testCompile because of jarHell w/ elasticsearch.test:framework

* Remove excludes in test dependencies (#108)

* Updates plugin version

* Updates usage of LockService to new async API (#107)

* Updates job scheduler test resource that contains async lock service
  • Loading branch information
dbbaughe authored Jan 17, 2020
1 parent 871e12a commit 158a21e
Show file tree
Hide file tree
Showing 8 changed files with 83 additions and 26 deletions.
18 changes: 2 additions & 16 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -71,28 +71,14 @@ detekt {
buildUponDefaultConfig = true
}

configurations.all {
if (it.state != Configuration.State.UNRESOLVED) return
resolutionStrategy {
force "commons-logging:commons-logging:${versions.commonslogging}"
force "org.apache.httpcomponents:httpcore:${versions.httpcore}"
force "commons-codec:commons-codec:${versions.commonscodec}"
}
}

dependencies {
compileOnly "org.elasticsearch:elasticsearch:${es_version}"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.0.0.1"
compileOnly "com.amazon.opendistroforelasticsearch:opendistro-job-scheduler-spi:1.0.0.2"
compile "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
compile "org.jetbrains.kotlin:kotlin-stdlib-common:${kotlin_version}"
compile 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.2.1'
compile "org.jetbrains:annotations:13.0"
compile "com.amazon.opendistroforelasticsearch:notification:1.0.0.0"
// alerting-notification transitive dependencies
compile "org.apache.httpcomponents:httpcore:4.4.5"
compile "org.apache.httpcomponents:httpclient:4.5.7"
compile "commons-logging:commons-logging:1.2"
compile "commons-codec:commons-codec:1.11"

testCompile "org.elasticsearch.test:framework:${es_version}"
testCompile "org.jetbrains.kotlin:kotlin-test:${kotlin_version}"
Expand All @@ -114,7 +100,7 @@ ext {
}

group = "com.amazon.opendistroforelasticsearch"
version = "${opendistroVersion}.1"
version = "${opendistroVersion}.2"

if (isSnapshot) {
version += "-SNAPSHOT"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ class IndexStateManagementHistory(
val alias = indexMetaData.aliases.firstOrNull { IndexStateManagementIndices.HISTORY_WRITE_INDEX_ALIAS == it.value.alias }
if (alias != null && historyEnabled) {
// If index has write alias and history is enable, don't delete the index.
break
continue
}

indexToDelete.add(indexMetaData.index.name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ class ManagedIndexCoordinator(
@OpenForTesting
suspend fun sweepClusterChangedEvent(event: ClusterChangedEvent) {
val indicesDeletedRequests = event.indicesDeleted()
.filter { event.previousState().metaData().index(it).getPolicyID() != null }
.filter { event.previousState().metaData().index(it)?.getPolicyID() != null }
.map { deleteManagedIndexRequest(it.uuid) }

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getStartingManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getStateToExecute
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getCompletedManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.getUpdatedActionMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.hasDifferentJobInterval
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.hasTimedOut
import com.amazon.opendistroforelasticsearch.indexstatemanagement.util.hasVersionConflict
Expand Down Expand Up @@ -174,13 +175,13 @@ object ManagedIndexRunner : ScheduledJobRunner,

launch {
// Attempt to acquire lock
val lock: LockModel? = withContext(Dispatchers.IO) { context.lockService.acquireLock(job, context) }
val lock: LockModel? = context.lockService.suspendUntil { acquireLock(job, context, it) }
if (lock == null) {
logger.debug("Could not acquire lock for ${job.index}")
} else {
runManagedIndexConfig(job)
// Release lock
val released = withContext(Dispatchers.IO) { context.lockService.release(lock) }
val released: Boolean = context.lockService.suspendUntil { release(lock, it) }
if (!released) {
logger.debug("Could not release lock for ${job.index}")
}
Expand Down Expand Up @@ -233,6 +234,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
val state = policy.getStateToExecute(managedIndexMetaData)
val action: Action? = state?.getActionToExecute(clusterService, scriptService, client, managedIndexMetaData)
val step: Step? = action?.getStepToExecute()
val currentActionMetaData = action?.getUpdatedActionMetaData(managedIndexMetaData, state)

// If Index State Management is disabled and the current step is not null and safe to disable on
// then disable the job and return early
Expand All @@ -241,11 +243,11 @@ object ManagedIndexRunner : ScheduledJobRunner,
return
}

if (action?.hasTimedOut(managedIndexMetaData.actionMetaData) == true) {
if (action?.hasTimedOut(currentActionMetaData) == true) {
val info = mapOf("message" to "Action timed out")
logger.error("Action=${action.type.type} has timed out")
val updated = updateManagedIndexMetaData(managedIndexMetaData
.copy(actionMetaData = managedIndexMetaData.actionMetaData?.copy(failed = true), info = info))
.copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info))
if (updated) disableManagedIndexConfig(managedIndexConfig)
return
}
Expand All @@ -255,7 +257,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
return
}

val shouldBackOff = action?.shouldBackoff(managedIndexMetaData.actionMetaData, action.config.configRetry)
val shouldBackOff = action?.shouldBackoff(currentActionMetaData, action.config.configRetry)
if (shouldBackOff?.first == true) {
// If we should back off then exit early.
logger.info("Backoff for retrying. Remaining time ${shouldBackOff.second}")
Expand All @@ -282,9 +284,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
val startingManagedIndexMetaData = managedIndexMetaData.getStartingManagedIndexMetaData(state, action, step)
val updateResult = updateManagedIndexMetaData(startingManagedIndexMetaData)

val actionMetaData = startingManagedIndexMetaData.actionMetaData

if (updateResult && state != null && action != null && step != null && actionMetaData != null) {
if (updateResult && state != null && action != null && step != null && currentActionMetaData != null) {
// Step null check is done in getStartingManagedIndexMetaData
step.execute()
var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.settings.ManagedIndexSettings
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.utils.LockService
import kotlinx.coroutines.delay
import org.apache.logging.log4j.Logger
import org.elasticsearch.ElasticsearchException
Expand Down Expand Up @@ -124,6 +125,20 @@ suspend fun <C : ElasticsearchClient, T> C.suspendUntil(block: C.(ActionListener
})
}

/**
* Converts [LockService] methods that take a callback into a kotlin suspending function.
*
* @param block - a block of code that is passed an [ActionListener] that should be passed to the LockService API.
*/
suspend fun <T> LockService.suspendUntil(block: LockService.(ActionListener<T>) -> Unit): T =
suspendCoroutine { cont ->
block(object : ActionListener<T> {
override fun onResponse(response: T) = cont.resume(response)

override fun onFailure(e: Exception) = cont.resumeWithException(e)
})
}

/**
* Compares current and previous IndexMetaData to determine if we should create [ManagedIndexConfig].
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,60 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
)
}
}

// https://github.com/opendistro-for-elasticsearch/index-management/issues/130
fun `test action timeout doesn't bleed over into next action`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val testPolicy = """
{"policy":{"description":"Default policy","default_state":"rolloverstate","states":[
{"name":"rolloverstate","actions":[{"timeout": "5s","open":{}},{"timeout":"1s","rollover":{"min_doc_count":100}}],
"transitions":[]}]}}
""".trimIndent()

createPolicyJson(testPolicy, policyID)

createIndex(indexName, policyID, "some_alias")

val managedIndexConfig = getExistingManagedIndexConfig(indexName)
// Change the start time so the job will trigger in 2 seconds.
// First execution. We need to initialize the policy.
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.POLICY_ID to policyID::equals)),
getExplainMap(indexName),
strict = false
)
}

// the second execution we move into open action, we won't hit the timeout as this is the execution that sets the startTime
updateManagedIndexConfigStartTime(managedIndexConfig)

val expectedOpenInfoString = mapOf("message" to "Successfully opened index").toString()
waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedOpenInfoString == info.toString())),
getExplainMap(indexName),
strict = false
)
}

// wait 5 seconds for the timeout from the first action to have passed
Thread.sleep(5000L)

// the third execution we move into rollover action, we should not hit the timeout yet because its the first execution of rollover
// but there was a bug before where it would use the startTime from the previous actions metadata and immediately fail
updateManagedIndexConfigStartTime(managedIndexConfig)

val expectedRolloverInfoString = mapOf("message" to "Attempting to rollover").toString()
waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedRolloverInfoString == info.toString())),
getExplainMap(indexName),
strict = false
)
}
}
}
Binary file not shown.
Binary file not shown.

0 comments on commit 158a21e

Please sign in to comment.