Skip to content

Commit

Permalink
integrate security-analytics & alerting for correlation engine (#412) (
Browse files Browse the repository at this point in the history
…#415)

Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
opensearch-trigger-bot[bot] authored Apr 19, 2023
1 parent 09e5c5f commit d8fb094
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import org.opensearch.commons.alerting.action.GetFindingsRequest
import org.opensearch.commons.alerting.action.GetFindingsResponse
import org.opensearch.commons.alerting.action.IndexMonitorRequest
import org.opensearch.commons.alerting.action.IndexMonitorResponse
import org.opensearch.commons.alerting.action.PublishFindingsRequest
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
import org.opensearch.commons.notifications.action.BaseResponse
import org.opensearch.commons.utils.recreateObject

Expand Down Expand Up @@ -143,6 +145,24 @@ object AlertingPluginInterface {
)
}

fun publishFinding(
client: NodeClient,
request: PublishFindingsRequest,
listener: ActionListener<SubscribeFindingsResponse>
) {
client.execute(
AlertingActions.SUBSCRIBE_FINDINGS_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
SubscribeFindingsResponse(
it
)
}
}
)
}

@Suppress("UNCHECKED_CAST")
private fun <Response : BaseResponse> wrapActionListener(
listener: ActionListener<Response>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ object AlertingActions {
const val DELETE_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/delete"
const val GET_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/get"
const val ACKNOWLEDGE_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/ack"
const val SUBSCRIBE_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/subscribe"

@JvmField
val INDEX_MONITOR_ACTION_TYPE =
Expand All @@ -32,4 +33,7 @@ object AlertingActions {
@JvmField
val ACKNOWLEDGE_ALERTS_ACTION_TYPE =
ActionType(ACKNOWLEDGE_ALERTS_ACTION_NAME, ::AcknowledgeAlertResponse)
@JvmField
val SUBSCRIBE_FINDINGS_ACTION_TYPE =
ActionType(SUBSCRIBE_FINDINGS_ACTION_NAME, ::SubscribeFindingsResponse)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.commons.alerting.model.Finding
import java.io.IOException

class PublishFindingsRequest : ActionRequest {

val monitorId: String

val finding: Finding

constructor(
monitorId: String,
finding: Finding
) : super() {
this.monitorId = monitorId
this.finding = finding
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
monitorId = sin.readString(),
finding = Finding.readFrom(sin)
)

override fun validate(): ActionRequestValidationException? {
return null
}

override fun writeTo(out: StreamOutput) {
out.writeString(monitorId)
finding.writeTo(out)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.opensearch.commons.alerting.action

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.commons.notifications.action.BaseResponse
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.rest.RestStatus
import java.io.IOException

class SubscribeFindingsResponse : BaseResponse {

private var status: RestStatus

constructor(status: RestStatus) : super() {
this.status = status
}

@Throws(IOException::class)
constructor(sin: StreamInput) {
this.status = sin.readEnum(RestStatus::class.java)
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeEnum(status)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field("status", status.status)
return builder.endObject()
}

override fun getStatus(): RestStatus {
return this.status
}
}
14 changes: 14 additions & 0 deletions src/main/kotlin/org/opensearch/commons/alerting/model/Finding.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import java.time.Instant
class Finding(
val id: String = NO_ID,
val relatedDocIds: List<String>,
val correlatedDocIds: List<String> = listOf(),
val monitorId: String,
val monitorName: String,
val index: String,
Expand All @@ -28,6 +29,7 @@ class Finding(
constructor(sin: StreamInput) : this(
id = sin.readString(),
relatedDocIds = sin.readStringList(),
correlatedDocIds = sin.readStringList(),
monitorId = sin.readString(),
monitorName = sin.readString(),
index = sin.readString(),
Expand All @@ -39,6 +41,7 @@ class Finding(
return mapOf(
FINDING_ID_FIELD to id,
RELATED_DOC_IDS_FIELD to relatedDocIds,
CORRELATED_DOC_IDS_FIELD to correlatedDocIds,
MONITOR_ID_FIELD to monitorId,
MONITOR_NAME_FIELD to monitorName,
INDEX_FIELD to index,
Expand All @@ -51,6 +54,7 @@ class Finding(
builder.startObject()
.field(FINDING_ID_FIELD, id)
.field(RELATED_DOC_IDS_FIELD, relatedDocIds)
.field(CORRELATED_DOC_IDS_FIELD, correlatedDocIds)
.field(MONITOR_ID_FIELD, monitorId)
.field(MONITOR_NAME_FIELD, monitorName)
.field(INDEX_FIELD, index)
Expand All @@ -64,6 +68,7 @@ class Finding(
override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeStringCollection(relatedDocIds)
out.writeStringCollection(correlatedDocIds)
out.writeString(monitorId)
out.writeString(monitorName)
out.writeString(index)
Expand All @@ -74,6 +79,7 @@ class Finding(
companion object {
const val FINDING_ID_FIELD = "id"
const val RELATED_DOC_IDS_FIELD = "related_doc_ids"
const val CORRELATED_DOC_IDS_FIELD = "correlated_doc_ids"
const val MONITOR_ID_FIELD = "monitor_id"
const val MONITOR_NAME_FIELD = "monitor_name"
const val INDEX_FIELD = "index"
Expand All @@ -87,6 +93,7 @@ class Finding(
fun parse(xcp: XContentParser): Finding {
var id: String = NO_ID
val relatedDocIds: MutableList<String> = mutableListOf()
val correlatedDocIds: MutableList<String> = mutableListOf()
lateinit var monitorId: String
lateinit var monitorName: String
lateinit var index: String
Expand All @@ -106,6 +113,12 @@ class Finding(
relatedDocIds.add(xcp.text())
}
}
CORRELATED_DOC_IDS_FIELD -> {
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
correlatedDocIds.add(xcp.text())
}
}
MONITOR_ID_FIELD -> monitorId = xcp.text()
MONITOR_NAME_FIELD -> monitorName = xcp.text()
INDEX_FIELD -> index = xcp.text()
Expand All @@ -124,6 +137,7 @@ class Finding(
return Finding(
id = id,
relatedDocIds = relatedDocIds,
correlatedDocIds = correlatedDocIds,
monitorId = monitorId,
monitorName = monitorName,
index = index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal class GetFindingsResponseTests {
val finding1 = Finding(
"1",
listOf("doc1", "doc2", "doc3"),
listOf("doc1", "doc2", "doc3"),
"monitor_id1",
"monitor_name1",
"test_index1",
Expand All @@ -39,6 +40,7 @@ internal class GetFindingsResponseTests {
val finding2 = Finding(
"1",
listOf("doc21", "doc22"),
listOf("doc21", "doc22"),
"monitor_id2",
"monitor_name2",
"test_index2",
Expand Down

0 comments on commit d8fb094

Please sign in to comment.