Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adds support for multi-node run/testing and updates tests #254

Merged
merged 4 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@ This project currently uses the Notification subproject from the [Alerting plugi

1. `./gradlew build` builds and tests project.
2. `./gradlew run` launches a single node cluster with the index management (and job-scheduler) plugin installed.
3. `./gradlew integTest` launches a single node cluster with the index management (and job-scheduler) plugin installed and runs all integ tests.
4. `./gradlew integTest -Dtests.class=*RestChangePolicyActionIT` runs a single integ class
5. `./gradlew integTest -Dtests.class=*RestChangePolicyActionIT -Dtests.method="test missing index"` runs a single integ test method (remember to quote the test method name if it contains spaces)
3. `./gradlew run -PnumNodes=3` launches a multi-node cluster with the index management (and job-scheduler) plugin installed.
4. `./gradlew integTest` launches a single node cluster with the index management (and job-scheduler) plugin installed and runs all integ tests.
5. `./gradlew integTest -PnumNodes=3` launches a multi-node cluster with the index management (and job-scheduler) plugin installed and runs all integ tests.
6. `./gradlew integTest -Dtests.class=*RestChangePolicyActionIT` runs a single integ class
7. `./gradlew integTest -Dtests.class=*RestChangePolicyActionIT -Dtests.method="test missing index"` runs a single integ test method (remember to quote the test method name if it contains spaces)

When launching a cluster using one of the above commands, logs are placed in `build/testclusters/integTest-0/logs`. Though the logs are teed to the console, in practices it's best to check the actual log file.

Expand Down
38 changes: 36 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ apply plugin: 'elasticsearch.testclusters'
apply plugin: 'io.gitlab.arturbosch.detekt'
apply plugin: 'org.jetbrains.kotlin.jvm'
apply plugin: 'org.jetbrains.kotlin.plugin.allopen'
if (!System.properties.containsKey('tests.rest.cluster') && !System.properties.containsKey('tests.cluster')) {

def usingRemoteCluster = System.properties.containsKey('tests.rest.cluster') || System.properties.containsKey('tests.cluster')
def usingMultiNode = project.properties.containsKey('numNodes')
// Only apply jacoco test coverage if we are running a local single node cluster
if (!usingRemoteCluster && !usingMultiNode) {
apply from: 'build-tools/esplugin-coverage.gradle'
}
check.dependsOn jacocoTestReport
Expand Down Expand Up @@ -143,21 +147,41 @@ test {
systemProperty 'tests.security.manager', 'false'
}

File repo = file("$buildDir/testclusters/repo")
def _numNodes = findProperty('numNodes') as Integer ?: 1
testClusters.integTest {
testDistribution = "OSS"
// Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1
if (_numNodes > 1) numberOfNodes = _numNodes
// When running integration tests it doesn't forward the --debug-jvm to the cluster anymore
// i.e. we have to use a custom property to flag when we want to debug elasticsearch JVM
// since we also support multi node integration tests we increase debugPort per node
if (System.getProperty("cluster.debug") != null) {
jvmArgs('-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=5005')
def debugPort = 5005
nodes.forEach { node ->
node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=*:${debugPort}")
debugPort += 1
}
}
plugin(fileTree("src/test/resources/job-scheduler").getSingleFile())
setting 'path.repo', repo.absolutePath
}

integTest.runner {
systemProperty 'tests.security.manager', 'false'
systemProperty 'java.io.tmpdir', es_tmp_dir.absolutePath
systemProperty 'tests.path.repo', repo.absolutePath
// Tell the test JVM if the cluster JVM is running under a debugger so that tests can use longer timeouts for
// requests. The 'doFirst' delays reading the debug setting on the cluster till execution time.
doFirst {
systemProperty 'cluster.debug', getDebug()
// Set number of nodes system property to be used in tests
systemProperty 'cluster.number_of_nodes', "${_numNodes}"
// There seems to be an issue when running multi node run or integ tasks with unicast_hosts
// not being written, the waitForAllConditions ensures it's written
getClusters().forEach { cluster ->
cluster.waitForAllConditions()
}
}

// The -Dcluster.debug option makes the cluster debuggable; this makes the tests debuggable
Expand All @@ -166,6 +190,16 @@ integTest.runner {
}
}

run {
doFirst {
// There seems to be an issue when running multi node run or integ tasks with unicast_hosts
// not being written, the waitForAllConditions ensures it's written
getClusters().forEach { cluster ->
cluster.waitForAllConditions()
}
}
}

task ktlint(type: JavaExec, group: "verification") {
description = "Check Kotlin code style."
main = "com.pinterest.ktlint.Main"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.http.entity.ContentType.APPLICATION_JSON
import org.apache.http.entity.StringEntity
import org.apache.http.message.BasicHeader
import org.elasticsearch.ElasticsearchParseException
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.client.Request
import org.elasticsearch.client.Response
Expand Down Expand Up @@ -79,6 +80,7 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() {

private val isDebuggingTest = DisableOnDebug(null).isDebugging
private val isDebuggingRemoteCluster = System.getProperty("cluster.debug", "false")!!.toBoolean()
private val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1

fun Response.asMap(): Map<String, Any> = entityAsMap(this)

Expand Down Expand Up @@ -251,6 +253,17 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() {
}
}

protected fun getManagedIndexConfigByDocId(id: String): ManagedIndexConfig? {
val response = client().makeRequest("GET", "$INDEX_STATE_MANAGEMENT_INDEX/_doc/$id")
assertEquals("Request failed", RestStatus.OK, response.restStatus())
val getResponse = GetResponse.fromXContent(createParser(jsonXContent, response.entity.content))
assertTrue("Did not find managed index config", getResponse.isExists)
return getResponse?.run {
val xcp = createParser(jsonXContent, sourceAsBytesRef)
ManagedIndexConfig.parseWithType(xcp, id, seqNo, primaryTerm)
}
}

@Suppress("UNCHECKED_CAST")
protected fun getHistorySearchResponse(index: String): SearchResponse {
val request = """
Expand Down Expand Up @@ -293,7 +306,8 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() {
val intervalSchedule = (update.jobSchedule as IntervalSchedule)
val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis()
val startTimeMillis = desiredStartTimeMillis ?: Instant.now().toEpochMilli() - millis
val response = client().makeRequest("POST", "$INDEX_STATE_MANAGEMENT_INDEX/_update/${update.id}",
val waitForActiveShards = if (isMultiNode) "all" else "1"
val response = client().makeRequest("POST", "$INDEX_STATE_MANAGEMENT_INDEX/_update/${update.id}?wait_for_active_shards=$waitForActiveShards",
StringEntity(
"{\"doc\":{\"managed_index\":{\"schedule\":{\"interval\":{\"start_time\":" +
"\"$startTimeMillis\"}}}}}",
Expand Down Expand Up @@ -339,23 +353,31 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() {
}
}

// Validate segment count per shard by specifying the min and max it should be
@Suppress("UNCHECKED_CAST")
protected fun getSegmentCount(index: String): Int {
val statsResponse: Map<String, Any> = getStats(index)

// Assert that shard count of stats response is 1 since the stats request being used is at the index level
// (meaning the segment count in the response is aggregated) but segment count for force merge
// (which this method is primarily being used for) is going to be validated per shard
val shardsInfo = statsResponse["_shards"] as Map<String, Int>
assertEquals("Shard count higher than expected", 1, shardsInfo["successful"])

val indicesStats = statsResponse["indices"] as Map<String, Map<String, Map<String, Map<String, Any?>>>>
return indicesStats[index]!!["primaries"]!!["segments"]!!["count"] as Int
protected fun validateSegmentCount(index: String, min: Int? = null, max: Int? = null): Boolean {
if (min == null && max == null) throw IllegalArgumentException("Must provide at least a min or max")
val statsResponse: Map<String, Any> = getShardSegmentStats(index)

val indicesStats = statsResponse["indices"] as Map<String, Map<String, Map<String, List<Map<String, Map<String, Any?>>>>>>
return indicesStats[index]!!["shards"]!!.values.all { list ->
list.filter { it["routing"]!!["primary"] == true }.all {
logger.info("Checking primary shard segments for $it")
if (it["routing"]!!["state"] != "STARTED") {
false
} else {
val count = it["segments"]!!["count"] as Int
if (min != null && count < min) return false
if (max != null && count > max) return false
return true
}
}
}
}

/** Get stats for [index] */
private fun getStats(index: String): Map<String, Any> {
val response = client().makeRequest("GET", "/$index/_stats")
/** Get shard segment stats for [index] */
private fun getShardSegmentStats(index: String): Map<String, Any> {
val response = client().makeRequest("GET", "/$index/_stats/segments?level=shards")

assertEquals("Stats request failed", RestStatus.OK, response.restStatus())

Expand Down Expand Up @@ -440,17 +462,7 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() {
assertEquals("Unable to create a new repository", RestStatus.OK, response.restStatus())
}

@Suppress("UNCHECKED_CAST")
private fun getRepoPath(): String {
val response = client()
.makeRequest(
"GET",
"_nodes",
emptyMap()
)
assertEquals("Unable to get a nodes settings", RestStatus.OK, response.restStatus())
return ((response.asMap()["nodes"] as HashMap<String, HashMap<String, HashMap<String, HashMap<String, Any>>>>).values.first()["settings"]!!["path"]!!["repo"] as List<String>)[0]
}
private fun getRepoPath(): String = System.getProperty("tests.path.repo")

private fun getSnapshotsList(repository: String): List<Any> {
val response = client()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,12 @@ class ActionRetryIT : IndexStateManagementRestTestCase() {

// Second execution is to fail the step once.
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)

waitFor { assertEquals(1, getExplainManagedIndexMetaData(indexName).actionMetaData?.consumedRetries) }

// Third execution should not run job since we have the retry backoff.
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)
Thread.sleep(5000) // currently there is nothing to compare when backing off so we have to sleep

// Fourth execution should not run job since we have the retry backoff.
updateManagedIndexConfigStartTime(managedIndexConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() {
// Add sample data to increase segment count, passing in a delay to ensure multiple segments get created
insertSampleData(indexName, 3, 1000)

waitFor { assertTrue("Segment count for [$indexName] was less than expected", getSegmentCount(indexName) > 1) }
waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) }

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

Expand All @@ -72,7 +72,7 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() {

// Third execution: Force merge operation is kicked off
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)

// verify we set maxNumSegments in action properties when kicking off force merge
waitFor {
assertEquals(
Expand All @@ -84,9 +84,8 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() {

// Fourth execution: Waits for force merge to complete, which will happen in this execution since index is small
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)

waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) }
waitFor { assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) }
// verify we reset actionproperties at end of forcemerge
waitFor { assertNull("maxNumSegments was not reset", getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties) }
// index should still be readonly after force merge finishes
Expand Down Expand Up @@ -117,7 +116,7 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() {
// Add sample data to increase segment count, passing in a delay to ensure multiple segments get created
insertSampleData(indexName, 3, 1000)

waitFor { assertTrue("Segment count for [$indexName] was less than expected", getSegmentCount(indexName) > 1) }
waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) }

// Set index to read-only
updateIndexSettings(indexName, Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true))
Expand All @@ -133,17 +132,20 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() {
// Second execution: Index was already read-only and should remain so for force_merge
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals("Set index to read-only", getExplainManagedIndexMetaData(indexName).info?.get("message")) }

waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) }

// Third execution: Force merge operation is kicked off
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)

waitFor { assertEquals("Started force merge", getExplainManagedIndexMetaData(indexName).info?.get("message")) }

// Fourth execution: Waits for force merge to complete, which will happen in this execution since index is small
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)

waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) }
waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) }
waitFor { assertEquals("Force merge completed", getExplainManagedIndexMetaData(indexName).info?.get("message")) }
assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1))
assertEquals("true", getIndexBlocksWriteSetting(indexName))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT)

fun `test basic`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val indexName = "${testIndexName}_index_basic"
val policyID = "${testIndexName}_policy_basic"
val repository = "repository"
val snapshot = "snapshot"
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
Expand Down Expand Up @@ -62,15 +62,14 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {

// Need to wait two cycles for wait for snapshot step
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)

waitFor { assertSnapshotExists(repository, snapshot) }
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshot) }
}

fun `test successful wait for snapshot step`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val indexName = "${testIndexName}_index_success"
val policyID = "${testIndexName}_policy_success"
val repository = "repository"
val snapshot = "snapshot_success_test"
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
Expand Down Expand Up @@ -118,8 +117,8 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
}

fun `test failed wait for snapshot step`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val indexName = "${testIndexName}_index_failed"
val policyID = "${testIndexName}_policy_failed"
val repository = "repository"
val snapshot = "snapshot_failed_test"
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() {

// Confirm job was disabled
val disabledManagedIndexConfig: ManagedIndexConfig = waitFor {
val config = getExistingManagedIndexConfig(indexName)
assertEquals("ManagedIndexConfig was not disabled", false, config.enabled)
val config = getManagedIndexConfigByDocId(managedIndexConfig.id)
assertNotNull("Could not find ManagedIndexConfig", config)
assertEquals("ManagedIndexConfig was not disabled", false, config!!.enabled)
config
}

Expand Down Expand Up @@ -239,7 +240,7 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() {
// Add sample data to increase segment count, passing in a delay to ensure multiple segments get created
insertSampleData(indexName, 3, 1000)

waitFor { assertTrue("Segment count for [$indexName] was less than expected", getSegmentCount(indexName) > 1) }
waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) }

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

Expand All @@ -258,7 +259,6 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() {

// Third execution: Force merge operation is kicked off
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)

// Verify maxNumSegments is set in action properties when kicking off force merge
waitFor {
Expand All @@ -274,13 +274,15 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() {

// Fourth execution: WaitForForceMergeStep is not safe to disable on, so the job should not disable yet
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)

// Confirm we successfully executed the WaitForForceMergeStep
waitFor { assertEquals("Force merge completed", getExplainManagedIndexMetaData(indexName).info?.get("message")) }

// Confirm job was not disabled
assertEquals("ManagedIndexConfig was disabled early", true, getExistingManagedIndexConfig(indexName).enabled)

// Validate segments were merged
waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) }
assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1))

// Fifth execution: Attempt transition, which is safe to disable on, so job should be disabled
updateManagedIndexConfigStartTime(managedIndexConfig)
Expand Down
Loading