Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Backports rollover conditions and force merge bug, action messaging, tests #277

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build-tools/esplugin-coverage.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ integTestCluster {
jacocoTestReport {
dependsOn integTest, test
executionData dummyTest.jacoco.destinationFile, dummyIntegTest.jacoco.destinationFile
sourceDirectories = sourceSets.main.allSource
classDirectories = sourceSets.main.output
sourceDirectories.from = "src/main/kotlin"
classDirectories.from = sourceSets.main.output
reports {
html.enabled = true // human readable
xml.enabled = true // for coverlay
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ object ManagedIndexRunner : ScheduledJobRunner,

if (updateResult && state != null && action != null && step != null && currentActionMetaData != null) {
// Step null check is done in getStartingManagedIndexMetaData
step.execute()
step.preExecute(logger).execute().postExecute(logger)
var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step)

if (executedManagedIndexMetaData.isFailed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import com.amazon.opendistroforelasticsearch.jobscheduler.spi.utils.LockService
import kotlinx.coroutines.delay
import org.apache.logging.log4j.Logger
import org.elasticsearch.ElasticsearchException
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.bulk.BackoffPolicy
import org.elasticsearch.action.support.DefaultShardOperationFailedException
import org.elasticsearch.client.ElasticsearchClient
import org.elasticsearch.cluster.metadata.IndexMetaData
import org.elasticsearch.common.bytes.BytesReference
Expand All @@ -36,6 +38,7 @@ import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParserUtils
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.transport.RemoteTransportException
import java.time.Instant
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
Expand Down Expand Up @@ -202,9 +205,7 @@ fun IndexMetaData.getRolloverAlias(): String? {
fun IndexMetaData.getClusterStateManagedIndexConfig(): ClusterStateManagedIndexConfig? {
val index = this.index.name
val uuid = this.index.uuid
val policyID = this.getPolicyID()

if (policyID == null) return null
val policyID = this.getPolicyID() ?: return null

return ClusterStateManagedIndexConfig(index = index, uuid = uuid, policyID = policyID)
}
Expand All @@ -217,3 +218,13 @@ fun IndexMetaData.getManagedIndexMetaData(): ManagedIndexMetaData? {
}
return null
}

fun Throwable.findRemoteTransportException(): RemoteTransportException? {
if (this is RemoteTransportException) return this
return this.cause?.findRemoteTransportException()
}

fun DefaultShardOperationFailedException.getUsefulCauseString(): String {
val rte = this.cause?.findRemoteTransportException()
return if (rte == null) this.toString() else ExceptionsHelper.unwrapCause(rte).toString()
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ data class SweptManagedIndexConfig(
) {

companion object {
@Suppress("ComplexMethod")
@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser, seqNo: Long, primaryTerm: Long): SweptManagedIndexConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package com.amazon.opendistroforelasticsearch.indexstatemanagement.step

import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import org.apache.logging.log4j.Logger
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.io.stream.Writeable
Expand All @@ -25,7 +26,17 @@ import java.util.Locale

abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMetaData, val isSafeToDisableOn: Boolean = true) {

abstract suspend fun execute()
fun preExecute(logger: Logger): Step {
logger.info("Executing $name for ${managedIndexMetaData.index}")
return this
}

abstract suspend fun execute(): Step

fun postExecute(logger: Logger): Step {
logger.info("Finished executing $name for ${managedIndexMetaData.index}")
return this
}

abstract fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData

Expand All @@ -44,9 +55,7 @@ abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMeta
*/
abstract fun isIdempotent(): Boolean

fun getStartingStepMetaData(): StepMetaData {
return StepMetaData(name, getStepStartTime().toEpochMilli(), StepStatus.STARTING)
}
fun getStartingStepMetaData(): StepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), StepStatus.STARTING)

fun getStepStartTime(): Instant {
if (managedIndexMetaData.stepMetaData == null || managedIndexMetaData.stepMetaData.name != this.name) {
Expand All @@ -55,6 +64,8 @@ abstract class Step(val name: String, val managedIndexMetaData: ManagedIndexMeta
return Instant.ofEpochMilli(managedIndexMetaData.stepMetaData.startTime)
}

protected val indexName: String = managedIndexMetaData.index

enum class StepStatus(val status: String) : Writeable {
STARTING("starting"),
CONDITION_NOT_MET("condition_not_met"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.C
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.snapshots.SnapshotInProgressException
import org.elasticsearch.transport.RemoteTransportException

class AttemptCloseStep(
val clusterService: ClusterService,
Expand All @@ -41,34 +43,52 @@ class AttemptCloseStep(
override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
val index = managedIndexMetaData.index
override suspend fun execute(): AttemptCloseStep {
try {
logger.info("Executing close on $index")
val closeIndexRequest = CloseIndexRequest()
.indices(index)
.indices(indexName)

val response: AcknowledgedResponse = client.admin().indices().suspendUntil { close(closeIndexRequest, it) }
logger.info("Close index for $index was acknowledged=${response.isAcknowledged}")
if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Successfully closed index")
info = mapOf("message" to getSuccessMessage(indexName))
} else {
val message = getFailedMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Failed to close index")
info = mapOf("message" to message)
}
} catch (e: RemoteTransportException) {
val cause = ExceptionsHelper.unwrapCause(e)
if (cause is SnapshotInProgressException) {
handleSnapshotException(cause)
} else {
handleException(cause as Exception)
}
} catch (e: SnapshotInProgressException) {
logger.warn("Failed to close index [index=$index] with snapshot in progress")
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Index had snapshot in progress, retrying closing")
handleSnapshotException(e)
} catch (e: Exception) {
logger.error("Failed to set index to close [index=$index]", e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to "Failed to set index to close")
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
handleException(e)
}

return this
}

private fun handleSnapshotException(e: SnapshotInProgressException) {
val message = getSnapshotMessage(indexName)
logger.warn(message, e)
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to message)
}

private fun handleException(e: Exception) {
val message = getFailedMessage(indexName)
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
Expand All @@ -78,4 +98,10 @@ class AttemptCloseStep(
info = info
)
}

companion object {
fun getFailedMessage(index: String) = "Failed to close index [index=$index]"
fun getSuccessMessage(index: String) = "Successfully closed index [index=$index]"
fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying closing [index=$index]"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.D
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.snapshots.SnapshotInProgressException
import org.elasticsearch.transport.RemoteTransportException
import java.lang.Exception

class AttemptDeleteStep(
Expand All @@ -42,30 +44,51 @@ class AttemptDeleteStep(
override fun isIdempotent() = true

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
override suspend fun execute(): AttemptDeleteStep {
try {
val response: AcknowledgedResponse = client.admin().indices()
.suspendUntil { delete(DeleteIndexRequest(managedIndexMetaData.index), it) }
.suspendUntil { delete(DeleteIndexRequest(indexName), it) }

if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Deleted index")
info = mapOf("message" to getSuccessMessage(indexName))
} else {
val message = getFailedMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Failed to delete index")
info = mapOf("message" to message)
}
} catch (e: RemoteTransportException) {
val cause = ExceptionsHelper.unwrapCause(e)
if (cause is SnapshotInProgressException) {
handleSnapshotException(cause)
} else {
handleException(cause as Exception)
}
} catch (e: SnapshotInProgressException) {
logger.warn("Failed to delete index [index=${managedIndexMetaData.index}] with snapshot in progress")
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Index had snapshot in progress, retrying deletion")
handleSnapshotException(e)
} catch (e: Exception) {
logger.error("Failed to delete index [index=${managedIndexMetaData.index}]", e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to "Failed to delete index")
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
handleException(e)
}

return this
}

private fun handleSnapshotException(e: SnapshotInProgressException) {
val message = getSnapshotMessage(indexName)
logger.warn(message, e)
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to message)
}

private fun handleException(e: Exception) {
val message = getFailedMessage(indexName)
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
Expand All @@ -78,5 +101,8 @@ class AttemptDeleteStep(

companion object {
const val name = "attempt_delete"
fun getFailedMessage(index: String) = "Failed to delete index [index=$index]"
fun getSuccessMessage(index: String) = "Successfully deleted index [index=$index]"
fun getSnapshotMessage(index: String) = "Index had snapshot in progress, retrying deletion [index=$index]"
}
}
Loading