Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented support for configuring a cluster metrics monitor to call cat/indices, and cat/shards. #992

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.alerting.util.toMap
import org.opensearch.alerting.util.use
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest
import org.opensearch.action.admin.indices.recovery.RecoveryRequest
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.CatIndicesRequestWrapper
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.CatShardsRequestWrapper
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.json.JsonXContent
import org.opensearch.commons.alerting.model.ClusterMetricsInput
Expand Down Expand Up @@ -84,12 +86,14 @@ class SupportedClusterMetricsSettings : org.opensearch.commons.alerting.settings
fun resolveToActionRequest(clusterMetricsInput: ClusterMetricsInput): ActionRequest {
val pathParams = clusterMetricsInput.parsePathParams()
return when (clusterMetricsInput.clusterMetricType) {
ClusterMetricType.CAT_INDICES -> CatIndicesRequestWrapper(pathParams)
ClusterMetricType.CAT_PENDING_TASKS -> PendingClusterTasksRequest()
ClusterMetricType.CAT_RECOVERY -> {
if (pathParams.isEmpty()) return RecoveryRequest()
val pathParamsArray = pathParams.split(",").toTypedArray()
return RecoveryRequest(*pathParamsArray)
}
ClusterMetricType.CAT_SHARDS -> CatShardsRequestWrapper(pathParams)
ClusterMetricType.CAT_SNAPSHOTS -> {
return GetSnapshotsRequest(pathParams, arrayOf(GetSnapshotsRequest.ALL_SNAPSHOTS))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import org.opensearch.index.IndexNotFoundException
class IndexUtils {

companion object {
val VALID_INDEX_NAME_REGEX = Regex("""^(?![_\-\+])(?!.*\.\.)[^\s,\\\/\*\?"<>|#:\.]{1,255}$""")
AWSHurneyt marked this conversation as resolved.
Show resolved Hide resolved

const val _META = "_meta"
const val SCHEMA_VERSION = "schema_version"

Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.util
package org.opensearch.alerting.util.clusterMetricsMonitorHelpers

import org.opensearch.action.ActionResponse
import org.opensearch.action.admin.cluster.health.ClusterHealthRequest
Expand All @@ -16,41 +16,76 @@ import org.opensearch.action.admin.cluster.settings.ClusterGetSettingsResponse
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.opensearch.action.admin.cluster.state.ClusterStateRequest
import org.opensearch.action.admin.cluster.state.ClusterStateResponse
import org.opensearch.action.admin.cluster.stats.ClusterStatsRequest
import org.opensearch.action.admin.cluster.stats.ClusterStatsResponse
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksResponse
import org.opensearch.action.admin.indices.recovery.RecoveryRequest
import org.opensearch.action.admin.indices.recovery.RecoveryResponse
import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings
import org.opensearch.alerting.settings.SupportedClusterMetricsSettings.Companion.resolveToActionRequest
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.Metadata
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.support.XContentMapValues
import org.opensearch.commons.alerting.model.ClusterMetricsInput
import kotlin.collections.ArrayList
import kotlin.collections.HashMap

/**
* Calls the appropriate transport action for the API requested in the [clusterMetricsInput].
* @param clusterMetricsInput The [ClusterMetricsInput] to resolve.
* @param client The [Client] used to call the respective transport action.
* @throws IllegalArgumentException When the requested API is not supported by this feature.
*/
fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse {
suspend fun executeTransportAction(clusterMetricsInput: ClusterMetricsInput, client: Client): ActionResponse {
val request = resolveToActionRequest(clusterMetricsInput)
return when (clusterMetricsInput.clusterMetricType) {
ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS -> client.admin().cluster()
.pendingClusterTasks(request as PendingClusterTasksRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY -> client.admin().indices().recoveries(request as RecoveryRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS -> client.admin().cluster().getSnapshots(request as GetSnapshotsRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_TASKS -> client.admin().cluster().listTasks(request as ListTasksRequest).get()
ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH -> client.admin().cluster().health(request as ClusterHealthRequest).get()
ClusterMetricsInput.ClusterMetricType.CAT_INDICES -> {
request as CatIndicesRequestWrapper
val healthResponse: ClusterHealthResponse =
client.suspendUntil { admin().cluster().health(request.clusterHealthRequest) }
val indexSettingsResponse: GetSettingsResponse =
client.suspendUntil { client.admin().indices().getSettings(request.indexSettingsRequest) }
val indicesResponse: IndicesStatsResponse =
client.suspendUntil { client.admin().indices().stats(request.indicesStatsRequest) }
val stateResponse: ClusterStateResponse =
client.suspendUntil { client.admin().cluster().state(request.clusterStateRequest) }
return CatIndicesResponseWrapper(healthResponse, stateResponse, indexSettingsResponse, indicesResponse)
}
ClusterMetricsInput.ClusterMetricType.CAT_PENDING_TASKS ->
client.suspendUntil { client.admin().cluster().pendingClusterTasks(request as PendingClusterTasksRequest) }
ClusterMetricsInput.ClusterMetricType.CAT_RECOVERY ->
client.suspendUntil { client.admin().indices().recoveries(request as RecoveryRequest) }
ClusterMetricsInput.ClusterMetricType.CAT_SHARDS -> {
request as CatShardsRequestWrapper
val stateResponse: ClusterStateResponse =
client.suspendUntil { client.admin().cluster().state(request.clusterStateRequest) }
val indicesResponse: IndicesStatsResponse =
client.suspendUntil { client.admin().indices().stats(request.indicesStatsRequest) }
return CatShardsResponseWrapper(stateResponse, indicesResponse)
}
ClusterMetricsInput.ClusterMetricType.CAT_SNAPSHOTS ->
client.suspendUntil { client.admin().cluster().getSnapshots(request as GetSnapshotsRequest) }
ClusterMetricsInput.ClusterMetricType.CAT_TASKS ->
client.suspendUntil { client.admin().cluster().listTasks(request as ListTasksRequest) }
ClusterMetricsInput.ClusterMetricType.CLUSTER_HEALTH ->
client.suspendUntil { client.admin().cluster().health(request as ClusterHealthRequest) }
ClusterMetricsInput.ClusterMetricType.CLUSTER_SETTINGS -> {
val metadata = client.admin().cluster().state(request as ClusterStateRequest).get().state.metadata
val stateResponse: ClusterStateResponse =
client.suspendUntil { client.admin().cluster().state(request as ClusterStateRequest) }
val metadata: Metadata = stateResponse.state.metadata
return ClusterGetSettingsResponse(metadata.persistentSettings(), metadata.transientSettings(), Settings.EMPTY)
}
ClusterMetricsInput.ClusterMetricType.CLUSTER_STATS -> client.admin().cluster().clusterStats(request as ClusterStatsRequest).get()
ClusterMetricsInput.ClusterMetricType.NODES_STATS -> client.admin().cluster().nodesStats(request as NodesStatsRequest).get()
ClusterMetricsInput.ClusterMetricType.CLUSTER_STATS ->
client.suspendUntil { client.admin().cluster().clusterStats(request as ClusterStatsRequest) }
ClusterMetricsInput.ClusterMetricType.NODES_STATS ->
client.suspendUntil { client.admin().cluster().nodesStats(request as NodesStatsRequest) }
else -> throw IllegalArgumentException("Unsupported API request type: ${request.javaClass.name}")
}
}
Expand All @@ -74,6 +109,14 @@ fun ActionResponse.toMap(): Map<String, Any> {
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.CLUSTER_SETTINGS.defaultPath)
)
is CatIndicesResponseWrapper -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.CAT_INDICES.defaultPath)
)
is CatShardsResponseWrapper -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.CAT_SHARDS.defaultPath)
)
is NodesStatsResponse -> redactFieldsFromResponse(
this.convertToMap(),
SupportedClusterMetricsSettings.getSupportedJsonPayload(ClusterMetricsInput.ClusterMetricType.NODES_STATS.defaultPath)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{
"/_cat/indices": {},
"/_cat/pending_tasks": {},
"/_cat/recovery": {},
"/_cat/shards": {},
"/_cat/snapshots": {},
"/_cat/tasks": {},
"/_cluster/health": {},
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.util.clusterMetricsMonitorHelpers

import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.randomClusterMetricsInput
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.CatIndicesResponseWrapper.Companion.WRAPPER_FIELD
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.ClusterMetricsInput
import org.opensearch.core.common.Strings
import org.opensearch.test.OpenSearchSingleNodeTestCase

class CatIndicesWrappersIT : OpenSearchSingleNodeTestCase() {
private val path = ClusterMetricsInput.ClusterMetricType.CAT_INDICES.defaultPath

fun `test CatIndicesRequestWrapper validate valid pathParams`() {
// GIVEN
val pathParams = "index1,index-name-2,index-3"

// WHEN
val requestWrapper = CatIndicesRequestWrapper(pathParams = pathParams)

// THEN
assertEquals(3, requestWrapper.clusterHealthRequest.indices().size)
assertEquals(3, requestWrapper.clusterStateRequest.indices().size)
assertEquals(3, requestWrapper.indexSettingsRequest.indices().size)
assertEquals(3, requestWrapper.indicesStatsRequest.indices().size)
}

fun `test CatIndicesRequestWrapper validate without providing pathParams`() {
// GIVEN & WHEN
val requestWrapper = CatIndicesRequestWrapper()

// THEN
assertNull(requestWrapper.clusterHealthRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.indexSettingsRequest.indices())
assertNull(requestWrapper.indicesStatsRequest.indices())
}

fun `test CatIndicesRequestWrapper validate blank pathParams`() {
// GIVEN
val pathParams = " "

// WHEN
val requestWrapper = CatIndicesRequestWrapper(pathParams = pathParams)

// THEN
assertNull(requestWrapper.clusterHealthRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.indexSettingsRequest.indices())
assertNull(requestWrapper.indicesStatsRequest.indices())
}

fun `test CatIndicesRequestWrapper validate empty pathParams`() {
// GIVEN
val pathParams = ""

// WHEN
val requestWrapper = CatIndicesRequestWrapper(pathParams = pathParams)

// THEN
assertNull(requestWrapper.clusterHealthRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.clusterStateRequest.indices())
assertEquals(Strings.EMPTY_ARRAY, requestWrapper.indexSettingsRequest.indices())
assertNull(requestWrapper.indicesStatsRequest.indices())
}

fun `test CatIndicesRequestWrapper validate invalid pathParams`() {
// GIVEN
val pathParams = "_index1,index^2"

// WHEN & THEN
assertThrows(IllegalArgumentException::class.java) { CatIndicesRequestWrapper(pathParams = pathParams) }
}
Comment on lines +19 to +78
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to a unit test class


suspend fun `test CatIndicesResponseWrapper returns with only indices in pathParams`() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add an alias test too

// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
}.toMap()

testIndices.forEach { (indexName, docCount) ->
repeat(docCount) {
val docId = (it + 1).toString()
val docMessage = """
{
"message": "$indexName doc num $docId"
}
""".trimIndent()
indexDoc(indexName, docId, docMessage)
}
}

/*
Creating a subset of indices to use for the pathParams to test that all indices on the cluster ARE NOT returned.
*/
val pathParamsIndices = testIndices.keys.toList().subList(1, testIndices.size - 1)
val pathParams = pathParamsIndices.joinToString(",")
val input = randomClusterMetricsInput(path = path, pathParams = pathParams)

// WHEN
val responseMap = (executeTransportAction(input, client())).toMap()

// THEN
val shards = responseMap[WRAPPER_FIELD] as List<HashMap<String, String>>
val returnedIndices =
shards.map { (it[CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD] as String) to it }.toMap()

assertEquals(pathParamsIndices.size, returnedIndices.keys.size)
testIndices.forEach { (indexName, docCount) ->
if (pathParamsIndices.contains(indexName)) {
assertEquals(
indexName,
returnedIndices[indexName]?.get(CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD) as String
)
assertEquals(
docCount.toString(),
returnedIndices[indexName]?.get(CatIndicesResponseWrapper.IndexInfo.DOCS_COUNT_FIELD) as String
)
}
}
}
Comment on lines +114 to +126
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Future follow up to have a real cat indices call and compare all the values and not some.


suspend fun `test CatIndicesResponseWrapper returns with all indices when empty pathParams`() {
// GIVEN
val testIndices = (1..5).map {
"test-index${randomAlphaOfLength(10).lowercase()}" to randomIntBetween(1, 10)
}.toMap()

testIndices.forEach { (indexName, docCount) ->
repeat(docCount) {
val docId = (it + 1).toString()
val docMessage = """
{
"message": "$indexName doc num $docId"
}
""".trimIndent()
indexDoc(indexName, docId, docMessage)
}
}

val input = randomClusterMetricsInput(path = path)

// WHEN
val responseMap = (executeTransportAction(input, client())).toMap()

// THEN
val shards = responseMap[WRAPPER_FIELD] as List<HashMap<String, String>>
val returnedIndices =
shards.map { (it[CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD] as String) to it }.toMap()

assertEquals(testIndices.size, returnedIndices.keys.size)
testIndices.forEach { (indexName, docCount) ->
assertEquals(
indexName,
returnedIndices[indexName]?.get(CatIndicesResponseWrapper.IndexInfo.INDEX_FIELD) as String
)
assertEquals(
docCount.toString(),
returnedIndices[indexName]?.get(CatIndicesResponseWrapper.IndexInfo.DOCS_COUNT_FIELD) as String
)
}
}

private fun indexDoc(index: String, id: String, doc: String) {
client().prepareIndex(index).setId(id)
.setSource(doc, XContentType.JSON).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get()
}
}
Loading