Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adds rollover conditions into info object #208

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.unit.ByteSizeValue
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.rest.RestStatus
import java.time.Instant

Expand Down Expand Up @@ -67,24 +68,49 @@ class AttemptRolloverStep(
statsResponse ?: return

val indexCreationDate = clusterService.state().metaData().index(index).creationDate
val indexCreationDateInstant = Instant.ofEpochMilli(indexCreationDate)
if (indexCreationDate == -1L) {
val indexAgeTimeValue = if (indexCreationDate == -1L) {
logger.warn("$index had an indexCreationDate=-1L, cannot use for comparison")
// since we cannot use for comparison, we can set it to 0 as minAge will never be <= 0
TimeValue.timeValueMillis(0)
} else {
TimeValue.timeValueMillis(Instant.now().toEpochMilli() - indexCreationDate)
}
val numDocs = statsResponse.primaries.docs?.count ?: 0
val indexSize = ByteSizeValue(statsResponse.primaries.docs?.totalSizeInBytes ?: 0)

if (config.evaluateConditions(indexCreationDateInstant, numDocs, indexSize)) {
val conditions = listOfNotNull(
config.minAge?.let {
RolloverActionConfig.MIN_INDEX_AGE_FIELD to mapOf(
"condition" to it.toString(),
"current" to indexAgeTimeValue.toString(),
"creationDate" to indexCreationDate
)
},
config.minDocs?.let {
RolloverActionConfig.MIN_DOC_COUNT_FIELD to mapOf(
"condition" to it,
"current" to numDocs
)
},
config.minSize?.let {
RolloverActionConfig.MIN_SIZE_FIELD to mapOf(
"condition" to it.toString(),
"current" to indexSize.toString()
)
}
).toMap()

if (config.evaluateConditions(indexAgeTimeValue, numDocs, indexSize)) {
logger.info("$index rollover conditions evaluated to true [indexCreationDate=$indexCreationDate," +
" numDocs=$numDocs, indexSize=${indexSize.bytes}]")
executeRollover(alias)
executeRollover(alias, conditions)
} else {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Attempting to rollover")
info = mapOf("message" to "Attempting to rollover", "conditions" to conditions)
}
}

private suspend fun executeRollover(alias: String) {
@Suppress("ComplexMethod")
private suspend fun executeRollover(alias: String, conditions: Map<String, Map<String, Any?>>) {
try {
val request = RolloverRequest(alias, null)
val response: RolloverResponse = client.admin().indices().suspendUntil { rolloverIndex(request, it) }
Expand All @@ -95,12 +121,18 @@ class AttemptRolloverStep(
// If response isAcknowledged it means the index was created and alias was added to new index
if (response.isAcknowledged) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Rolled over index")
info = listOfNotNull(
"message" to "Rolled over index",
if (conditions.isEmpty()) null else "conditions" to conditions // don't show empty conditions object if no conditions specified
).toMap()
} else {
// If the alias update response is NOT acknowledged we will get back isAcknowledged=false
// This means the new index was created but we failed to swap the alias
stepStatus = StepStatus.FAILED
info = mapOf("message" to "New index created (${response.newIndex}), but failed to update alias")
info = listOfNotNull(
"message" to "New index created (${response.newIndex}), but failed to update alias",
if (conditions.isEmpty()) null else "conditions" to conditions // don't show empty conditions object if no conditions specified
).toMap()
}
} catch (e: Exception) {
logger.error("Failed to rollover index [index=${managedIndexMetaData.index}]", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.elasticsearch.action.update.UpdateRequest
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.unit.ByteSizeValue
import org.elasticsearch.common.unit.TimeValue
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.XContentFactory
import org.elasticsearch.index.query.BoolQueryBuilder
Expand Down Expand Up @@ -216,7 +217,7 @@ fun Transition.hasStatsConditions(): Boolean = this.conditions?.docCount != null

@Suppress("ReturnCount")
fun RolloverActionConfig.evaluateConditions(
indexCreationDate: Instant,
indexAgeTimeValue: TimeValue,
numDocs: Long,
indexSize: ByteSizeValue
): Boolean {
Expand All @@ -232,11 +233,7 @@ fun RolloverActionConfig.evaluateConditions(
}

if (this.minAge != null) {
val indexCreationDateMilli = indexCreationDate.toEpochMilli()
if (indexCreationDateMilli != -1L) {
val elapsedTime = Instant.now().toEpochMilli() - indexCreationDateMilli
if (this.minAge.millis <= elapsedTime) return true
}
if (this.minAge.millis <= indexAgeTimeValue.millis) return true
}

if (this.minSize != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedI
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.ActionMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor
import org.hamcrest.collection.IsMapContaining
import java.time.Instant
import java.util.Locale

Expand Down Expand Up @@ -54,13 +55,11 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {

// the second execution we move into rollover action, we won't hit the timeout as this is the execution that sets the startTime
updateManagedIndexConfigStartTime(managedIndexConfig)

val expectedInfoString = mapOf("message" to "Attempting to rollover").toString()
waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedInfoString == info.toString())),
getExplainMap(indexName),
strict = false
assertThat(
"Should be attempting to rollover",
getExplainManagedIndexMetaData(indexName).info,
IsMapContaining.hasEntry("message", "Attempting to rollover" as Any?)
)
}

Expand Down Expand Up @@ -122,13 +121,11 @@ class ActionTimeoutIT : IndexStateManagementRestTestCase() {
// the third execution we move into rollover action, we should not hit the timeout yet because its the first execution of rollover
// but there was a bug before where it would use the startTime from the previous actions metadata and immediately fail
updateManagedIndexConfigStartTime(managedIndexConfig)

val expectedRolloverInfoString = mapOf("message" to "Attempting to rollover").toString()
waitFor {
assertPredicatesOnMetaData(
listOf(indexName to listOf(ManagedIndexMetaData.INFO to fun(info: Any?): Boolean = expectedRolloverInfoString == info.toString())),
getExplainMap(indexName),
strict = false
assertThat(
"Should be attempting to rollover",
getExplainManagedIndexMetaData(indexName).info,
IsMapContaining.hasEntry("message", "Attempting to rollover" as Any?)
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@
package com.amazon.opendistroforelasticsearch.indexstatemanagement.action

import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementRestTestCase
import com.amazon.opendistroforelasticsearch.indexstatemanagement.makeRequest
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification
import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor
import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.elasticsearch.common.unit.ByteSizeUnit
import org.elasticsearch.common.unit.ByteSizeValue
import org.elasticsearch.rest.RestRequest
import org.elasticsearch.common.unit.TimeValue
import org.hamcrest.core.Is.isA
import org.junit.Assert
import java.time.Instant
import java.time.temporal.ChronoUnit
Expand All @@ -36,6 +34,7 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {

private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT)

@Suppress("UNCHECKED_CAST")
fun `test rollover no condition`() {
val aliasName = "${testIndexName}_alias"
val indexNameBase = "${testIndexName}_index"
Expand Down Expand Up @@ -65,10 +64,15 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) }
waitFor {
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
assertEquals("Index did not rollover.", "Rolled over index", info["message"])
assertNull("Should not have conditions if none specified", info["conditions"])
}
Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
}

@Suppress("UNCHECKED_CAST")
fun `test rollover multi condition byte size`() {
val aliasName = "${testIndexName}_byte_alias"
val indexNameBase = "${testIndexName}_index_byte"
Expand Down Expand Up @@ -98,36 +102,47 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Index rollover before it met the condition.", mapOf("message" to "Attempting to rollover"), getExplainManagedIndexMetaData(firstIndex).info) }

client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/1111",
StringEntity("{ \"testkey\": \"some valueaaaaaaa\" }", ContentType.APPLICATION_JSON)
)
client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/2222",
StringEntity("{ \"testkey1\": \"some value1\" }", ContentType.APPLICATION_JSON)
)
client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/3333",
StringEntity("{ \"testkey2\": \"some value2\" }", ContentType.APPLICATION_JSON)
)
waitFor {
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
assertEquals("Index rollover before it met the condition.", "Attempting to rollover", info["message"])
val conditions = info["conditions"] as Map<String, Any?>
assertEquals("Did not have exclusively min size and min doc count conditions",
setOf(RolloverActionConfig.MIN_SIZE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys)
val minSize = conditions[RolloverActionConfig.MIN_SIZE_FIELD] as Map<String, Any?>
val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map<String, Any?>
assertEquals("Did not have min size condition", "10b", minSize["condition"])
assertThat("Did not have min size current", minSize["current"], isA(String::class.java))
assertEquals("Did not have min doc count condition", 1000000, minDocCount["condition"])
assertEquals("Did not have min doc count current", 0, minDocCount["current"])
}

insertSampleData(index = firstIndex, docCount = 5, delay = 0)

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) }
waitFor {
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
assertEquals("Index did not rollover", "Rolled over index", info["message"])
val conditions = info["conditions"] as Map<String, Any?>
assertEquals("Did not have exclusively min size and min doc count conditions",
setOf(RolloverActionConfig.MIN_SIZE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys)
val minSize = conditions[RolloverActionConfig.MIN_SIZE_FIELD] as Map<String, Any?>
val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map<String, Any?>
assertEquals("Did not have min size condition", "10b", minSize["condition"])
assertThat("Did not have min size current", minSize["current"], isA(String::class.java))
assertEquals("Did not have min doc count condition", 1000000, minDocCount["condition"])
assertEquals("Did not have min doc count current", 5, minDocCount["current"])
}
Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
}

@Suppress("UNCHECKED_CAST")
fun `test rollover multi condition doc size`() {
val aliasName = "${testIndexName}_doc_alias"
val indexNameBase = "${testIndexName}_index_doc"
val firstIndex = "$indexNameBase-1"
val policyID = "${testIndexName}_testPolicyName_doc_1"
val actionConfig = RolloverActionConfig(ByteSizeValue(10, ByteSizeUnit.TB), 3, null, 0)
val actionConfig = RolloverActionConfig(null, 3, TimeValue.timeValueDays(2), 0)
val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf()))
val policy = Policy(
id = policyID,
Expand All @@ -151,27 +166,37 @@ class RolloverActionIT : IndexStateManagementRestTestCase() {

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Index rollover before it met the condition.", mapOf("message" to "Attempting to rollover"), getExplainManagedIndexMetaData(firstIndex).info) }

client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/1111",
StringEntity("{ \"testkey\": \"some value\" }", ContentType.APPLICATION_JSON)
)
client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/2222",
StringEntity("{ \"testkey1\": \"some value1\" }", ContentType.APPLICATION_JSON)
)
client().makeRequest(
RestRequest.Method.PUT.toString(),
"$firstIndex/_doc/3333",
StringEntity("{ \"testkey2\": \"some value2\" }", ContentType.APPLICATION_JSON)
)
waitFor {
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
assertEquals("Index rollover before it met the condition.", "Attempting to rollover", info["message"])
val conditions = info["conditions"] as Map<String, Any?>
assertEquals("Did not have exclusively min age and min doc count conditions",
setOf(RolloverActionConfig.MIN_INDEX_AGE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys)
val minAge = conditions[RolloverActionConfig.MIN_INDEX_AGE_FIELD] as Map<String, Any?>
val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map<String, Any?>
assertEquals("Did not have min age condition", "2d", minAge["condition"])
assertThat("Did not have min age current", minAge["current"], isA(String::class.java))
assertEquals("Did not have min doc count condition", 3, minDocCount["condition"])
assertEquals("Did not have min doc count current", 0, minDocCount["current"])
}

insertSampleData(index = firstIndex, docCount = 5, delay = 0)

// Need to speed up to second execution where it will trigger the first execution of the action
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Index did not rollover.", mapOf("message" to "Rolled over index"), getExplainManagedIndexMetaData(firstIndex).info) }
waitFor {
val info = getExplainManagedIndexMetaData(firstIndex).info as Map<String, Any?>
assertEquals("Index did not rollover", "Rolled over index", info["message"])
val conditions = info["conditions"] as Map<String, Any?>
assertEquals("Did not have exclusively min age and min doc count conditions",
setOf(RolloverActionConfig.MIN_INDEX_AGE_FIELD, RolloverActionConfig.MIN_DOC_COUNT_FIELD), conditions.keys)
val minAge = conditions[RolloverActionConfig.MIN_INDEX_AGE_FIELD] as Map<String, Any?>
val minDocCount = conditions[RolloverActionConfig.MIN_DOC_COUNT_FIELD] as Map<String, Any?>
assertEquals("Did not have min age condition", "2d", minAge["condition"])
assertThat("Did not have min age current", minAge["current"], isA(String::class.java))
assertEquals("Did not have min doc count condition", 3, minDocCount["condition"])
assertEquals("Did not have min doc count current", 5, minDocCount["current"])
}
Assert.assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002"))
}
}
Loading