Skip to content

Commit

Permalink
Implemented support for configuring a cluster metrics monitor to call…
Browse files Browse the repository at this point in the history
… cat/indices, and cat/shards. (opensearch-project#992)

* Implemented support for configuring a cluster metrics monitor to call cat/indices, and cat/shards.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed ktlint errors.

Signed-off-by: AWSHurneyt <[email protected]>

* Refactored executeTransportAction to use suspendUntil() instead of get() to receive responses.

Signed-off-by: AWSHurneyt <[email protected]>

---------

Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed Jul 12, 2023
1 parent a3db266 commit b0c3106
Show file tree
Hide file tree
Showing 10 changed files with 1,751 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.alerting.util.toMap
import org.opensearch.alerting.util.use
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest
import org.opensearch.action.admin.indices.recovery.RecoveryRequest
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.CatIndicesRequestWrapper
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.CatShardsRequestWrapper
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.json.JsonXContent
import org.opensearch.commons.alerting.model.ClusterMetricsInput
Expand Down Expand Up @@ -84,12 +86,14 @@ class SupportedClusterMetricsSettings : org.opensearch.commons.alerting.settings
fun resolveToActionRequest(clusterMetricsInput: ClusterMetricsInput): ActionRequest {
val pathParams = clusterMetricsInput.parsePathParams()
return when (clusterMetricsInput.clusterMetricType) {
ClusterMetricType.CAT_INDICES -> CatIndicesRequestWrapper(pathParams)
ClusterMetricType.CAT_PENDING_TASKS -> PendingClusterTasksRequest()
ClusterMetricType.CAT_RECOVERY -> {
if (pathParams.isEmpty()) return RecoveryRequest()
val pathParamsArray = pathParams.split(",").toTypedArray()
return RecoveryRequest(*pathParamsArray)
}
ClusterMetricType.CAT_SHARDS -> CatShardsRequestWrapper(pathParams)
ClusterMetricType.CAT_SNAPSHOTS -> {
return GetSnapshotsRequest(pathParams, arrayOf(GetSnapshotsRequest.ALL_SNAPSHOTS))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.opensearch.index.IndexNotFoundException
class IndexUtils {

companion object {
val VALID_INDEX_NAME_REGEX = Regex("""^(?![_\-\+])(?!.*\.\.)[^\s,\\\/\*\?"<>|#:\.]{1,255}$""")

const val _META = "_meta"
const val SCHEMA_VERSION = "schema_version"

Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.util
package org.opensearch.alerting.util.clusterMetricsMonitorHelpers

import org.opensearch.action.ActionResponse
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
Expand All @@ -16,41 +16,76 @@ import org.opensearch.action.admin.cluster.settings.ClusterGetSettingsResponse
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.state.ClusterStateResponse
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksResponse
import org.opensearch.action.admin.indices.recovery.RecoveryRequest
import org.opensearch.action.admin.indices.recovery.RecoveryResponse
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings.Companion.resolveToActionRequest
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.Metadata
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.support.XContentMapValues
import org.opensearch.commons.alerting.model.ClusterMetricsInput
import kotlin.collections.ArrayList
import kotlin.collections.HashMap

/**
* Calls the appropriate transport action for the API requested in the [clusterMetricsInput].
* @param clusterMetricsInput The [ClusterMetricsInput] to resolve.
* @param client The [Client] used to call the respective transport action.
* @throws IllegalArgumentException When the requested API is not supported by this feature.
*/
fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse {
suspend fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse {
val request = resolveToActionRequest(clusterMetricsInput)
return when (clusterMetricsInput.clusterMetricType) {
ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS -> client.admin().cluster()
.pendingClusterTasks(request as PendingClusterTasksRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY -> client.admin().indices().recoveries(request as RecoveryRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS -> client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_TASKS -> client.admin().cluster().listTasks(request as ListTasksRequest).get()
ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH -> client.admin().cluster().health(request as ClusterHealthRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_INDICES -> {
request as CatIndicesRequestWrapper
val healthResponse: ClusterHealthResponse =
client.suspendUntil { admin().cluster().health(request.clusterHealthRequest) }
val indexSettingsResponse: GetSettingsResponse =
client.suspendUntil { client.admin().indices().getSettings(request.indexSettingsRequest) }
val indicesResponse: IndicesStatsResponse =
client.suspendUntil { client.admin().indices().stats(request.indicesStatsRequest) }
val stateResponse: ClusterStateResponse =
client.suspendUntil { client.admin().cluster().state(request.clusterStateRequest) }
return CatIndicesResponseWrapper(healthResponse, stateResponse, indexSettingsResponse, indicesResponse)
}
ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS ->
client.suspendUntil { client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest) }
ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY ->
client.suspendUntil { client.admin().indices().recoveries(request as RecoveryRequest) }
ClusterMetricsInput.ClusterMetricType.CAT_SHARDS -> {
request as CatShardsRequestWrapper
val stateResponse: ClusterStateResponse =
client.suspendUntil { client.admin().cluster().state(request.clusterStateRequest) }
val indicesResponse: IndicesStatsResponse =
client.suspendUntil { client.admin().indices().stats(request.indicesStatsRequest) }
return CatShardsResponseWrapper(stateResponse, indicesResponse)
}
ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS ->
client.suspendUntil { client.admin().cluster().getSnapshots(request as GetSnapshotsRequest) }
ClusterMetricsInput.ClusterMetricType.CAT_TASKS ->
client.suspendUntil { client.admin().cluster().listTasks(request as ListTasksRequest) }
ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH ->
client.suspendUntil { client.admin().cluster().health(request as ClusterHealthRequest) }
ClusterMetricsInput.ClusterMetricType.CLUSTER_SETTINGS -> {
val metadata = client.admin().cluster().state(request as ClusterStateRequest).get().state.metadata
val stateResponse: ClusterStateResponse =
client.suspendUntil { client.admin().cluster().state(request as ClusterStateRequest) }
val metadata: Metadata = stateResponse.state.metadata
return ClusterGetSettingsResponse(metadata.persistentSettings(), metadata.transientSettings(), Settings.EMPTY)
}
ClusterMetricsInput.ClusterMetricType.CLUSTER_STATS -> client.admin().cluster().clusterStats(request as ClusterStatsRequest).get()
ClusterMetricsInput.ClusterMetricType.NODES_STATS -> client.admin().cluster().nodesStats(request as NodesStatsRequest).get()
ClusterMetricsInput.ClusterMetricType.CLUSTER_STATS ->
client.suspendUntil { client.admin().cluster().clusterStats(request as ClusterStatsRequest) }
ClusterMetricsInput.ClusterMetricType.NODES_STATS ->
client.suspendUntil { client.admin().cluster().nodesStats(request as NodesStatsRequest) }
else -> throw IllegalArgumentException("Unsupported API request type: ${request.javaClass.name}")
}
}
Expand All @@ -74,6 +109,14 @@ fun ActionResponse.toMap(): Map<String, Any> {
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.CLUSTER_SETTINGS.defaultPath)
)
is CatIndicesResponseWrapper -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.CAT_INDICES.defaultPath)
)
is CatShardsResponseWrapper -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.CAT_SHARDS.defaultPath)
)
is NodesStatsResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.NODES_STATS.defaultPath)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{
"/_cat/indices": {},
"/_cat/pending_tasks": {},
"/_cat/recovery": {},
"/_cat/shards": {},
"/_cat/snapshots": {},
"/_cat/tasks": {},
"/_cluster/health": {},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.util.clusterMetricsMonitorHelpers

import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.randomClusterMetricsInput
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.CatIndicesResponseWrapper.Companion.WRAPPER_FIELD
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.ClusterMetricsInput
import org.opensearch.core.common.Strings
import org.opensearch.test.OpenSearchSingleNodeTestCase

class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() {
private val path = ClusterMetricsInput.ClusterMetricType.CAT_INDICES.defaultPath

fun `test CatIndicesRequestWrapper validate valid pathParams`() {
// GIVEN
val pathParams = "index1,index-name-2,index-3"

// WHEN
val requestWrapper = CatIndicesRequestWrapper(pathParams = pathParams)

// THEN
assertEquals(3, requestWrapper.clusterHealthRequest.indices().size)
assertEquals(3, requestWrapper.clusterStateRequest.indices().size)
assertEquals(3, requestWrapper.indexSettingsRequest.indices().size)
assertEquals(3, requestWrapper.indicesStatsRequest.indices().size)
}

fun `test CatIndicesRequestWrapper validate without providing pathParams`() {
// GIVEN & WHEN
val requestWrapper = CatIndicesRequestWrapper()

// THEN
assertNull(requestWrapper.clusterHealthRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.indexSettingsRequest.indices())
assertNull(requestWrapper.indicesStatsRequest.indices())
}

fun `test CatIndicesRequestWrapper validate blank pathParams`() {
// GIVEN
val pathParams = " "

// WHEN
val requestWrapper = CatIndicesRequestWrapper(pathParams = pathParams)

// THEN
assertNull(requestWrapper.clusterHealthRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.indexSettingsRequest.indices())
assertNull(requestWrapper.indicesStatsRequest.indices())
}

fun `test CatIndicesRequestWrapper validate empty pathParams`() {
// GIVEN
val pathParams = ""

// WHEN
val requestWrapper = CatIndicesRequestWrapper(pathParams = pathParams)

// THEN
assertNull(requestWrapper.clusterHealthRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.indexSettingsRequest.indices())
assertNull(requestWrapper.indicesStatsRequest.indices())
}

fun `test CatIndicesRequestWrapper validate invalid pathParams`() {
// GIVEN
val pathParams = "_index1,index^2"

// WHEN & THEN
assertThrows(IllegalArgumentException::class.java) { CatIndicesRequestWrapper(pathParams = pathParams) }
}

suspend fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
}.toMap()

testIndices.forEach { (indexName, docCount) ->
repeat(docCount) {
val docId = (it + 1).toString()
val docMessage = """
{
"message": "$indexName doc num $docId"
}
""".trimIndent()
indexDoc(indexName, docId, docMessage)
}
}

/*
Creating a subset of indices to use for the pathParams to test that all indices on the cluster ARE NOT returned.
*/
val pathParamsIndices = testIndices.keys.toList().subList(1, testIndices.size - 1)
val pathParams = pathParamsIndices.joinToString(",")
val input = randomClusterMetricsInput(path = path, pathParams = pathParams)

// WHEN
val responseMap = (executeTransportAction(input, client())).toMap()

// THEN
val shards = responseMap[WRAPPER_FIELD] as List<HashMap<String, String>>
val returnedIndices =
shards.map { (it[CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD] as String) to it }.toMap()

assertEquals(pathParamsIndices.size, returnedIndices.keys.size)
testIndices.forEach { (indexName, docCount) ->
if (pathParamsIndices.contains(indexName)) {
assertEquals(
indexName,
returnedIndices[indexName]?.get(CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD) as String
)
assertEquals(
docCount.toString(),
returnedIndices[indexName]?.get(CatIndicesResponseWrapper.IndexInfo.DOCS_COUNT_FIELD) as String
)
}
}
}

suspend fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
}.toMap()

testIndices.forEach { (indexName, docCount) ->
repeat(docCount) {
val docId = (it + 1).toString()
val docMessage = """
{
"message": "$indexName doc num $docId"
}
""".trimIndent()
indexDoc(indexName, docId, docMessage)
}
}

val input = randomClusterMetricsInput(path = path)

// WHEN
val responseMap = (executeTransportAction(input, client())).toMap()

// THEN
val shards = responseMap[WRAPPER_FIELD] as List<HashMap<String, String>>
val returnedIndices =
shards.map { (it[CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD] as String) to it }.toMap()

assertEquals(testIndices.size, returnedIndices.keys.size)
testIndices.forEach { (indexName, docCount) ->
assertEquals(
indexName,
returnedIndices[indexName]?.get(CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD) as String
)
assertEquals(
docCount.toString(),
returnedIndices[indexName]?.get(CatIndicesResponseWrapper.IndexInfo.DOCS_COUNT_FIELD) as String
)
}
}

private fun indexDoc(index: String, id: String, doc: String) {
client().prepareIndex(index).setId(id)
.setSource(doc, XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get()
}
}
Loading

0 comments on commit b0c3106

Please sign in to comment.