Skip to content

Commit

Permalink
Move get monitor and search monitor action / request / responses to c…
Browse files Browse the repository at this point in the history
…ommon-utils (opensearch-project#566)

* Add get monitor request/response

Signed-off-by: Tyler Ohlsen <[email protected]>

* Remove status from response; add to interface

Signed-off-by: Tyler Ohlsen <[email protected]>

* Add UT

Signed-off-by: Tyler Ohlsen <[email protected]>

* Repeat for search monitor action

Signed-off-by: Tyler Ohlsen <[email protected]>

---------

Signed-off-by: Tyler Ohlsen <[email protected]>
  • Loading branch information
ohltyler authored and engechas committed Feb 9, 2024
1 parent bd2a4f1 commit 00147e4
Show file tree
Hide file tree
Showing 9 changed files with 468 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package org.opensearch.commons.alerting

import org.opensearch.action.search.SearchResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest
import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse
Expand All @@ -17,6 +18,8 @@ import org.opensearch.commons.alerting.action.GetAlertsRequest
import org.opensearch.commons.alerting.action.GetAlertsResponse
import org.opensearch.commons.alerting.action.GetFindingsRequest
import org.opensearch.commons.alerting.action.GetFindingsResponse
import org.opensearch.commons.alerting.action.GetMonitorRequest
import org.opensearch.commons.alerting.action.GetMonitorResponse
import org.opensearch.commons.alerting.action.GetWorkflowAlertsRequest
import org.opensearch.commons.alerting.action.GetWorkflowAlertsResponse
import org.opensearch.commons.alerting.action.GetWorkflowRequest
Expand All @@ -26,6 +29,7 @@ import org.opensearch.commons.alerting.action.IndexMonitorResponse
import org.opensearch.commons.alerting.action.IndexWorkflowRequest
import org.opensearch.commons.alerting.action.IndexWorkflowResponse
import org.opensearch.commons.alerting.action.PublishFindingsRequest
import org.opensearch.commons.alerting.action.SearchMonitorRequest
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
import org.opensearch.commons.notifications.action.BaseResponse
import org.opensearch.commons.utils.recreateObject
Expand Down Expand Up @@ -288,6 +292,51 @@ object AlertingPluginInterface {
)
}

/**
* Get Monitor interface.
* @param client Node client for making transport action
* @param request The request object
* @param listener The listener for getting response
*/
fun getMonitor(
client: NodeClient,
request: GetMonitorRequest,
listener: ActionListener<GetMonitorResponse>
) {
client.execute(
AlertingActions.GET_MONITOR_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
GetMonitorResponse(
it
)
}
}
)
}

/**
* Search Monitors interface.
* @param client Node client for making transport action
* @param request The request object
* @param listener The listener for getting response
*/
fun searchMonitors(
client: NodeClient,
request: SearchMonitorRequest,
listener: ActionListener<SearchResponse>
) {
client.execute(
AlertingActions.SEARCH_MONITORS_ACTION_TYPE,
request,
// we do not use the wrapActionListener in this case since there is no need
// to recreate any object or specially handle onResponse / onFailure. It is
// simply returning a SearchResponse.
listener
)
}

@Suppress("UNCHECKED_CAST")
private fun <Response : BaseResponse> wrapActionListener(
listener: ActionListener<Response>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionType
import org.opensearch.action.search.SearchResponse

object AlertingActions {
const val INDEX_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/write"
Expand All @@ -18,6 +19,8 @@ object AlertingActions {
const val ACKNOWLEDGE_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/ack"
const val ACKNOWLEDGE_CHAINED_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/chained_alerts/ack"
const val SUBSCRIBE_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/subscribe"
const val GET_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/get"
const val SEARCH_MONITORS_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/search"

@JvmField
val INDEX_MONITOR_ACTION_TYPE =
Expand Down Expand Up @@ -60,4 +63,12 @@ object AlertingActions {
@JvmField
val ACKNOWLEDGE_CHAINED_ALERTS_ACTION_TYPE =
ActionType(ACKNOWLEDGE_CHAINED_ALERTS_ACTION_NAME, ::AcknowledgeAlertResponse)

@JvmField
val GET_MONITOR_ACTION_TYPE =
ActionType(GET_MONITOR_ACTION_NAME, ::GetMonitorResponse)

@JvmField
val SEARCH_MONITORS_ACTION_TYPE =
ActionType(SEARCH_MONITORS_ACTION_NAME, ::SearchResponse)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.rest.RestRequest
import org.opensearch.search.fetch.subphase.FetchSourceContext
import java.io.IOException

class GetMonitorRequest : ActionRequest {
val monitorId: String
val version: Long
val method: RestRequest.Method
val srcContext: FetchSourceContext?

constructor(
monitorId: String,
version: Long,
method: RestRequest.Method,
srcContext: FetchSourceContext?
) : super() {
this.monitorId = monitorId
this.version = version
this.method = method
this.srcContext = srcContext
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(), // monitorId
sin.readLong(), // version
sin.readEnum(RestRequest.Method::class.java), // method
if (sin.readBoolean()) {
FetchSourceContext(sin) // srcContext
} else null
)

override fun validate(): ActionRequestValidationException? {
return null
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(monitorId)
out.writeLong(version)
out.writeEnum(method)
out.writeBoolean(srcContext != null)
srcContext?.writeTo(out)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID
import org.opensearch.commons.alerting.util.IndexUtils.Companion._PRIMARY_TERM
import org.opensearch.commons.alerting.util.IndexUtils.Companion._SEQ_NO
import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION
import org.opensearch.commons.notifications.action.BaseResponse
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.ToXContentFragment
import org.opensearch.core.xcontent.XContentBuilder
import java.io.IOException

class GetMonitorResponse : BaseResponse {
var id: String
var version: Long
var seqNo: Long
var primaryTerm: Long
var monitor: Monitor?
var associatedWorkflows: List<AssociatedWorkflow>?

constructor(
id: String,
version: Long,
seqNo: Long,
primaryTerm: Long,
monitor: Monitor?,
associatedCompositeMonitors: List<AssociatedWorkflow>?,
) : super() {
this.id = id
this.version = version
this.seqNo = seqNo
this.primaryTerm = primaryTerm
this.monitor = monitor
this.associatedWorkflows = associatedCompositeMonitors ?: emptyList()
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
id = sin.readString(), // id
version = sin.readLong(), // version
seqNo = sin.readLong(), // seqNo
primaryTerm = sin.readLong(), // primaryTerm
monitor = if (sin.readBoolean()) {
Monitor.readFrom(sin) // monitor
} else null,
associatedCompositeMonitors = sin.readList((AssociatedWorkflow)::readFrom),
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeLong(version)
out.writeLong(seqNo)
out.writeLong(primaryTerm)
if (monitor != null) {
out.writeBoolean(true)
monitor?.writeTo(out)
} else {
out.writeBoolean(false)
}
associatedWorkflows?.forEach {
it.writeTo(out)
}
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field(_ID, id)
.field(_VERSION, version)
.field(_SEQ_NO, seqNo)
.field(_PRIMARY_TERM, primaryTerm)
if (monitor != null) {
builder.field("monitor", monitor)
}
if (associatedWorkflows != null) {
builder.field("associated_workflows", associatedWorkflows!!.toTypedArray())
}
return builder.endObject()
}

class AssociatedWorkflow : ToXContentFragment {
val id: String
val name: String

constructor(id: String, name: String) {
this.id = id
this.name = name
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params?): XContentBuilder {
builder.startObject()
.field("id", id)
.field("name", name)
.endObject()
return builder
}

fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeString(name)
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(),
sin.readString()
)

companion object {
@JvmStatic
@Throws(IOException::class)
fun readFrom(sin: StreamInput): AssociatedWorkflow {
return AssociatedWorkflow(sin)
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.action.search.SearchRequest
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import java.io.IOException

class SearchMonitorRequest : ActionRequest {

val searchRequest: SearchRequest

constructor(
searchRequest: SearchRequest
) : super() {
this.searchRequest = searchRequest
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
searchRequest = SearchRequest(sin)
)

override fun validate(): ActionRequestValidationException? {
return null
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
searchRequest.writeTo(out)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.mockito.Mockito
import org.mockito.Mockito.mock
import org.mockito.junit.jupiter.MockitoExtension
import org.opensearch.action.ActionType
import org.opensearch.action.search.SearchResponse
import org.opensearch.client.node.NodeClient
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.action.AcknowledgeAlertRequest
Expand All @@ -23,13 +24,16 @@ import org.opensearch.commons.alerting.action.GetAlertsRequest
import org.opensearch.commons.alerting.action.GetAlertsResponse
import org.opensearch.commons.alerting.action.GetFindingsRequest
import org.opensearch.commons.alerting.action.GetFindingsResponse
import org.opensearch.commons.alerting.action.GetMonitorRequest
import org.opensearch.commons.alerting.action.GetMonitorResponse
import org.opensearch.commons.alerting.action.GetWorkflowAlertsRequest
import org.opensearch.commons.alerting.action.GetWorkflowAlertsResponse
import org.opensearch.commons.alerting.action.IndexMonitorRequest
import org.opensearch.commons.alerting.action.IndexMonitorResponse
import org.opensearch.commons.alerting.action.IndexWorkflowRequest
import org.opensearch.commons.alerting.action.IndexWorkflowResponse
import org.opensearch.commons.alerting.action.PublishFindingsRequest
import org.opensearch.commons.alerting.action.SearchMonitorRequest
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
import org.opensearch.commons.alerting.model.FindingDocument
import org.opensearch.commons.alerting.model.FindingWithDocs
Expand Down Expand Up @@ -250,4 +254,32 @@ internal class AlertingPluginInterfaceTests {
AlertingPluginInterface.acknowledgeChainedAlerts(client, request, listener)
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
}

@Test
fun getMonitor() {
val request = mock(GetMonitorRequest::class.java)
val response = GetMonitorResponse("test-id", 1, 1, 1, null, null)
val listener: ActionListener<GetMonitorResponse> =
mock(ActionListener::class.java) as ActionListener<GetMonitorResponse>
Mockito.doAnswer {
(it.getArgument(2) as ActionListener<GetMonitorResponse>)
.onResponse(response)
}.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any())
AlertingPluginInterface.getMonitor(client, request, listener)
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
}

@Test
fun searchMonitors() {
val request = mock(SearchMonitorRequest::class.java)
val response = mock(SearchResponse::class.java)
val listener: ActionListener<SearchResponse> =
mock(ActionListener::class.java) as ActionListener<SearchResponse>
Mockito.doAnswer {
(it.getArgument(2) as ActionListener<SearchResponse>)
.onResponse(response)
}.whenever(client).execute(Mockito.any(ActionType::class.java), Mockito.any(), Mockito.any())
AlertingPluginInterface.searchMonitors(client, request, listener)
Mockito.verify(listener, Mockito.times(1)).onResponse(ArgumentMatchers.eq(response))
}
}
Loading

0 comments on commit 00147e4

Please sign in to comment.