Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Adds support for multi-node run/testing and updates tests #254

Merged
merged 4 commits into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,11 @@ This project currently uses the Notification subproject from the [Alerting plugi

1. `./gradlew build` builds and tests project.
2. `./gradlew run` launches a single node cluster with the index management (and job-scheduler) plugin installed.
3. `./gradlew integTest` launches a single node cluster with the index management (and job-scheduler) plugin installed and runs all integ tests.
4. `./gradlew integTest -Dtests.class=*RestChangePolicyActionIT` runs a single integ class
5. `./gradlew integTest -Dtests.class=*RestChangePolicyActionIT -Dtests.method="test missing index"` runs a single integ test method (remember to quote the test method name if it contains spaces)
3. `./gradlew run -PnumNodes=3` launches a multi-node cluster with the index management (and job-scheduler) plugin installed.
4. `./gradlew integTest` launches a single node cluster with the index management (and job-scheduler) plugin installed and runs all integ tests.
5. `./gradlew integTest -PnumNodes=3` launches a multi-node cluster with the index management (and job-scheduler) plugin installed and runs all integ tests.
6. `./gradlew integTest -Dtests.class=*RestChangePolicyActionIT` runs a single integ class
7. `./gradlew integTest -Dtests.class=*RestChangePolicyActionIT -Dtests.method="test missing index"` runs a single integ test method (remember to quote the test method name if it contains spaces)

When launching a cluster using one of the above commands, logs are placed in `build/testclusters/integTest-0/logs`. Though the logs are teed to the console, in practices it's best to check the actual log file.

Expand Down
40 changes: 37 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,11 @@ apply plugin: 'elasticsearch.testclusters'
apply plugin: 'io.gitlab.arturbosch.detekt'
apply plugin: 'org.jetbrains.kotlin.jvm'
apply plugin: 'org.jetbrains.kotlin.plugin.allopen'
if (!System.properties.containsKey('tests.rest.cluster') && !System.properties.containsKey('tests.cluster')) {

def usingRemoteCluster = System.properties.containsKey('tests.rest.cluster') || System.properties.containsKey('tests.cluster')
def usingMultiNode = project.properties.containsKey('numNodes')
// Only apply jacoco test coverage if we are running a local single node cluster
if (!usingRemoteCluster && !usingMultiNode) {
apply from: 'build-tools/esplugin-coverage.gradle'
}
check.dependsOn jacocoTestReport
Expand Down Expand Up @@ -143,21 +147,41 @@ test {
systemProperty 'tests.security.manager', 'false'
}

File repo = file("$buildDir/testclusters/repo")
def _numNodes = findProperty('numNodes') as Integer ?: 1
testClusters.integTest {
testDistribution = "OSS"
if (System.getProperty("cluster.debug") != null) {
jvmArgs('-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=5005')
// Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1
if (_numNodes > 1) numberOfNodes = _numNodes
// When running integration tests it doesn't forward the --debug-jvm to the cluster anymore
// i.e. we have to use a custom property to flag when we want to debug elasticsearch JVM
// since we also support multi node integration tests we increase debugPort per node
if (System.getProperty("es.debug") != null) {
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
def debugPort = 5005
nodes.forEach { node ->
node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=*:${debugPort}")
debugPort += 1
}
}
plugin(fileTree("src/test/resources/job-scheduler").getSingleFile())
setting 'path.repo', repo.absolutePath
}

integTest.runner {
systemProperty 'tests.security.manager', 'false'
systemProperty 'java.io.tmpdir', es_tmp_dir.absolutePath
systemProperty 'tests.path.repo', repo.absolutePath
// Tell the test JVM if the cluster JVM is running under a debugger so that tests can use longer timeouts for
// requests. The 'doFirst' delays reading the debug setting on the cluster till execution time.
doFirst {
systemProperty 'cluster.debug', getDebug()
// Set number of nodes system property to be used in tests
systemProperty 'cluster.number_of_nodes', "${_numNodes}"
// There seems to be an issue when running multi node run or integ tasks with unicast_hosts
// not being written, the waitForAllConditions ensures it's written
getClusters().forEach { cluster ->
cluster.waitForAllConditions()
}
}

// The -Dcluster.debug option makes the cluster debuggable; this makes the tests debuggable
Expand All @@ -166,6 +190,16 @@ integTest.runner {
}
}

run {
doFirst {
// There seems to be an issue when running multi node run or integ tasks with unicast_hosts
// not being written, the waitForAllConditions ensures it's written
getClusters().forEach { cluster ->
cluster.waitForAllConditions()
}
}
}

task ktlint(type: JavaExec, group: "verification") {
description = "Check Kotlin code style."
main = "com.pinterest.ktlint.Main"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.apache.http.entity.ContentType.APPLICATION_JSON
import org.apache.http.entity.StringEntity
import org.apache.http.message.BasicHeader
import org.elasticsearch.ElasticsearchParseException
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.client.Request
import org.elasticsearch.client.Response
Expand Down Expand Up @@ -79,6 +80,7 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() {

private val isDebuggingTest = DisableOnDebug(null).isDebugging
private val isDebuggingRemoteCluster = System.getProperty("cluster.debug", "false")!!.toBoolean()
private val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1

fun Response.asMap(): Map<String, Any> = entityAsMap(this)

Expand Down Expand Up @@ -251,6 +253,17 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() {
}
}

protected fun getManagedIndexConfigByDocId(id: String): ManagedIndexConfig? {
val response = client().makeRequest("GET", "$INDEX_STATE_MANAGEMENT_INDEX/_doc/$id")
assertEquals("Request failed", RestStatus.OK, response.restStatus())
val getResponse = GetResponse.fromXContent(createParser(jsonXContent, response.entity.content))
assertTrue("Did not find managed index config", getResponse.isExists)
return getResponse?.run {
val xcp = createParser(jsonXContent, sourceAsBytesRef)
ManagedIndexConfig.parseWithType(xcp, id, seqNo, primaryTerm)
}
}

@Suppress("UNCHECKED_CAST")
protected fun getHistorySearchResponse(index: String): SearchResponse {
val request = """
Expand Down Expand Up @@ -293,7 +306,8 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() {
val intervalSchedule = (update.jobSchedule as IntervalSchedule)
val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis()
val startTimeMillis = desiredStartTimeMillis ?: Instant.now().toEpochMilli() - millis
val response = client().makeRequest("POST", "$INDEX_STATE_MANAGEMENT_INDEX/_update/${update.id}",
val waitForActiveShards = if (isMultiNode) "all" else "1"
val response = client().makeRequest("POST", "$INDEX_STATE_MANAGEMENT_INDEX/_update/${update.id}?wait_for_active_shards=$waitForActiveShards",
StringEntity(
"{\"doc\":{\"managed_index\":{\"schedule\":{\"interval\":{\"start_time\":" +
"\"$startTimeMillis\"}}}}}",
Expand Down Expand Up @@ -339,23 +353,31 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() {
}
}

// Validate segment count per shard by specifying the min and max it should be
@Suppress("UNCHECKED_CAST")
protected fun getSegmentCount(index: String): Int {
val statsResponse: Map<String, Any> = getStats(index)

// Assert that shard count of stats response is 1 since the stats request being used is at the index level
// (meaning the segment count in the response is aggregated) but segment count for force merge
// (which this method is primarily being used for) is going to be validated per shard
val shardsInfo = statsResponse["_shards"] as Map<String, Int>
assertEquals("Shard count higher than expected", 1, shardsInfo["successful"])

val indicesStats = statsResponse["indices"] as Map<String, Map<String, Map<String, Map<String, Any?>>>>
return indicesStats[index]!!["primaries"]!!["segments"]!!["count"] as Int
protected fun validateSegmentCount(index: String, min: Int? = null, max: Int? = null): Boolean {
if (min == null && max == null) throw IllegalArgumentException("Must provide at least a min or max")
val statsResponse: Map<String, Any> = getShardSegmentStats(index)

val indicesStats = statsResponse["indices"] as Map<String, Map<String, Map<String, List<Map<String, Map<String, Any?>>>>>>
return indicesStats[index]!!["shards"]!!.values.all { list ->
list.filter { it["routing"]!!["primary"] == true }.all {
logger.info("Checking primary shard segments for $it")
if (it["routing"]!!["state"] != "STARTED") {
false
} else {
val count = it["segments"]!!["count"] as Int
if (min != null && count < min) return false
if (max != null && count > max) return false
return true
}
}
}
}

/** Get stats for [index] */
private fun getStats(index: String): Map<String, Any> {
val response = client().makeRequest("GET", "/$index/_stats")
/** Get shard segment stats for [index] */
private fun getShardSegmentStats(index: String): Map<String, Any> {
val response = client().makeRequest("GET", "/$index/_stats/segments?level=shards")

assertEquals("Stats request failed", RestStatus.OK, response.restStatus())

Expand Down Expand Up @@ -440,17 +462,7 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() {
assertEquals("Unable to create a new repository", RestStatus.OK, response.restStatus())
}

@Suppress("UNCHECKED_CAST")
private fun getRepoPath(): String {
val response = client()
.makeRequest(
"GET",
"_nodes",
emptyMap()
)
assertEquals("Unable to get a nodes settings", RestStatus.OK, response.restStatus())
return ((response.asMap()["nodes"] as HashMap<String, HashMap<String, HashMap<String, HashMap<String, Any>>>>).values.first()["settings"]!!["path"]!!["repo"] as List<String>)[0]
}
private fun getRepoPath(): String = System.getProperty("tests.path.repo")

private fun getSnapshotsList(repository: String): List<Any> {
val response = client()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,12 @@ class ActionRetryIT : IndexStateManagementRestTestCase() {

// Second execution is to fail the step once.
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)

waitFor { assertEquals(1, getExplainManagedIndexMetaData(indexName).actionMetaData?.consumedRetries) }

// Third execution should not run job since we have the retry backoff.
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)
Thread.sleep(5000) // currently there is nothing to compare when backing off so we have to sleep

// Fourth execution should not run job since we have the retry backoff.
updateManagedIndexConfigStartTime(managedIndexConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() {
// Add sample data to increase segment count, passing in a delay to ensure multiple segments get created
insertSampleData(indexName, 3, 1000)

waitFor { assertTrue("Segment count for [$indexName] was less than expected", getSegmentCount(indexName) > 1) }
waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) }

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

Expand All @@ -72,7 +72,7 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() {

// Third execution: Force merge operation is kicked off
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)

// verify we set maxNumSegments in action properties when kicking off force merge
waitFor {
assertEquals(
Expand All @@ -84,9 +84,8 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() {

// Fourth execution: Waits for force merge to complete, which will happen in this execution since index is small
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)

waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) }
waitFor { assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) }
// verify we reset actionproperties at end of forcemerge
waitFor { assertNull("maxNumSegments was not reset", getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties) }
// index should still be readonly after force merge finishes
Expand Down Expand Up @@ -117,7 +116,7 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() {
// Add sample data to increase segment count, passing in a delay to ensure multiple segments get created
insertSampleData(indexName, 3, 1000)

waitFor { assertTrue("Segment count for [$indexName] was less than expected", getSegmentCount(indexName) > 1) }
waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) }

// Set index to read-only
updateIndexSettings(indexName, Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true))
Expand All @@ -133,17 +132,20 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() {
// Second execution: Index was already read-only and should remain so for force_merge
updateManagedIndexConfigStartTime(managedIndexConfig)

waitFor { assertEquals("Set index to read-only", getExplainManagedIndexMetaData(indexName).info?.get("message")) }

waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) }

// Third execution: Force merge operation is kicked off
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)

waitFor { assertEquals("Started force merge", getExplainManagedIndexMetaData(indexName).info?.get("message")) }

// Fourth execution: Waits for force merge to complete, which will happen in this execution since index is small
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)

waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) }
waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) }
waitFor { assertEquals("Force merge completed", getExplainManagedIndexMetaData(indexName).info?.get("message")) }
assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1))
assertEquals("true", getIndexBlocksWriteSetting(indexName))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT)

fun `test basic`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val indexName = "${testIndexName}_index_basic"
val policyID = "${testIndexName}_policy_basic"
val repository = "repository"
val snapshot = "snapshot"
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
Expand Down Expand Up @@ -62,15 +62,14 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {

// Need to wait two cycles for wait for snapshot step
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)

waitFor { assertSnapshotExists(repository, snapshot) }
waitFor { assertSnapshotFinishedWithSuccess(repository, snapshot) }
}

fun `test successful wait for snapshot step`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val indexName = "${testIndexName}_index_success"
val policyID = "${testIndexName}_policy_success"
val repository = "repository"
val snapshot = "snapshot_success_test"
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
Expand Down Expand Up @@ -118,8 +117,8 @@ class SnapshotActionIT : IndexStateManagementRestTestCase() {
}

fun `test failed wait for snapshot step`() {
val indexName = "${testIndexName}_index_1"
val policyID = "${testIndexName}_testPolicyName_1"
val indexName = "${testIndexName}_index_failed"
val policyID = "${testIndexName}_policy_failed"
val repository = "repository"
val snapshot = "snapshot_failed_test"
val actionConfig = SnapshotActionConfig(repository, snapshot, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,9 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() {

// Confirm job was disabled
val disabledManagedIndexConfig: ManagedIndexConfig = waitFor {
val config = getExistingManagedIndexConfig(indexName)
assertEquals("ManagedIndexConfig was not disabled", false, config.enabled)
val config = getManagedIndexConfigByDocId(managedIndexConfig.id)
assertNotNull("Could not find ManagedIndexConfig", config)
assertEquals("ManagedIndexConfig was not disabled", false, config!!.enabled)
config
}

Expand Down Expand Up @@ -239,7 +240,7 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() {
// Add sample data to increase segment count, passing in a delay to ensure multiple segments get created
insertSampleData(indexName, 3, 1000)

waitFor { assertTrue("Segment count for [$indexName] was less than expected", getSegmentCount(indexName) > 1) }
waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) }

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

Expand All @@ -258,7 +259,6 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() {

// Third execution: Force merge operation is kicked off
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)

// Verify maxNumSegments is set in action properties when kicking off force merge
waitFor {
Expand All @@ -274,13 +274,15 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() {

// Fourth execution: WaitForForceMergeStep is not safe to disable on, so the job should not disable yet
updateManagedIndexConfigStartTime(managedIndexConfig)
Thread.sleep(3000)

// Confirm we successfully executed the WaitForForceMergeStep
waitFor { assertEquals("Force merge completed", getExplainManagedIndexMetaData(indexName).info?.get("message")) }

// Confirm job was not disabled
assertEquals("ManagedIndexConfig was disabled early", true, getExistingManagedIndexConfig(indexName).enabled)

// Validate segments were merged
waitFor { assertEquals("Segment count for [$indexName] after force merge is incorrect", 1, getSegmentCount(indexName)) }
assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1))

// Fifth execution: Attempt transition, which is safe to disable on, so job should be disabled
updateManagedIndexConfigStartTime(managedIndexConfig)
Expand Down
Loading