Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

integrate security-analytics & alerting for correlation engine #412

Merged
merged 3 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import org.opensearch.commons.alerting.action.GetFindingsRequest
import org.opensearch.commons.alerting.action.GetFindingsResponse
import org.opensearch.commons.alerting.action.IndexMonitorRequest
import org.opensearch.commons.alerting.action.IndexMonitorResponse
import org.opensearch.commons.alerting.action.PublishFindingsRequest
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
import org.opensearch.commons.notifications.action.BaseResponse
import org.opensearch.commons.utils.recreateObject

Expand Down Expand Up @@ -143,6 +145,24 @@ object AlertingPluginInterface {
)
}

fun publishFinding(
client: NodeClient,
request: PublishFindingsRequest,
listener: ActionListener<SubscribeFindingsResponse>
) {
client.execute(
AlertingActions.SUBSCRIBE_FINDINGS_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
SubscribeFindingsResponse(
it
)
}
}
)
}

@Suppress("UNCHECKED_CAST")
private fun <Response : BaseResponse> wrapActionListener(
listener: ActionListener<Response>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ object AlertingActions {
const val DELETE_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/delete"
const val GET_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/get"
const val ACKNOWLEDGE_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/ack"
const val SUBSCRIBE_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/subscribe"

@JvmField
val INDEX_MONITOR_ACTION_TYPE =
Expand All @@ -28,4 +29,7 @@ object AlertingActions {
@JvmField
val ACKNOWLEDGE_ALERTS_ACTION_TYPE =
ActionType(ACKNOWLEDGE_ALERTS_ACTION_NAME, ::AcknowledgeAlertResponse)
@JvmField
val SUBSCRIBE_FINDINGS_ACTION_TYPE =
ActionType(SUBSCRIBE_FINDINGS_ACTION_NAME, ::SubscribeFindingsResponse)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.commons.alerting.model.Finding
import java.io.IOException

class PublishFindingsRequest : ActionRequest {

val monitorId: String

val finding: Finding

constructor(
monitorId: String,
finding: Finding
) : super() {
this.monitorId = monitorId
this.finding = finding
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
monitorId = sin.readString(),
finding = Finding.readFrom(sin)
)

override fun validate(): ActionRequestValidationException? {
return null
}

override fun writeTo(out: StreamOutput) {
out.writeString(monitorId)
finding.writeTo(out)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.opensearch.commons.alerting.action

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.commons.notifications.action.BaseResponse
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.rest.RestStatus
import java.io.IOException

class SubscribeFindingsResponse : BaseResponse {

private var status: RestStatus

constructor(status: RestStatus) : super() {
this.status = status
}

@Throws(IOException::class)
constructor(sin: StreamInput) {
this.status = sin.readEnum(RestStatus::class.java)
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeEnum(status)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field("status", status.status)
return builder.endObject()
}

override fun getStatus(): RestStatus {
return this.status
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import java.time.Instant
class Finding(
val id: String = NO_ID,
val relatedDocIds: List<String>,
val correlatedDocIds: List<String> = listOf(),
val monitorId: String,
val monitorName: String,
val index: String,
Expand All @@ -28,6 +29,7 @@ class Finding(
constructor(sin: StreamInput) : this(
id = sin.readString(),
relatedDocIds = sin.readStringList(),
correlatedDocIds = sin.readStringList(),
monitorId = sin.readString(),
monitorName = sin.readString(),
index = sin.readString(),
Expand All @@ -39,6 +41,7 @@ class Finding(
return mapOf(
FINDING_ID_FIELD to id,
RELATED_DOC_IDS_FIELD to relatedDocIds,
CORRELATED_DOC_IDS_FIELD to correlatedDocIds,
MONITOR_ID_FIELD to monitorId,
MONITOR_NAME_FIELD to monitorName,
INDEX_FIELD to index,
Expand All @@ -51,6 +54,7 @@ class Finding(
builder.startObject()
.field(FINDING_ID_FIELD, id)
.field(RELATED_DOC_IDS_FIELD, relatedDocIds)
.field(CORRELATED_DOC_IDS_FIELD, correlatedDocIds)
.field(MONITOR_ID_FIELD, monitorId)
.field(MONITOR_NAME_FIELD, monitorName)
.field(INDEX_FIELD, index)
Expand All @@ -64,6 +68,7 @@ class Finding(
override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeStringCollection(relatedDocIds)
out.writeStringCollection(correlatedDocIds)
out.writeString(monitorId)
out.writeString(monitorName)
out.writeString(index)
Expand All @@ -74,6 +79,7 @@ class Finding(
companion object {
const val FINDING_ID_FIELD = "id"
const val RELATED_DOC_IDS_FIELD = "related_doc_ids"
const val CORRELATED_DOC_IDS_FIELD = "correlated_doc_ids"
const val MONITOR_ID_FIELD = "monitor_id"
const val MONITOR_NAME_FIELD = "monitor_name"
const val INDEX_FIELD = "index"
Expand All @@ -86,6 +92,7 @@ class Finding(
fun parse(xcp: XContentParser): Finding {
var id: String = NO_ID
val relatedDocIds: MutableList<String> = mutableListOf()
val correlatedDocIds: MutableList<String> = mutableListOf()
lateinit var monitorId: String
lateinit var monitorName: String
lateinit var index: String
Expand All @@ -105,6 +112,12 @@ class Finding(
relatedDocIds.add(xcp.text())
}
}
CORRELATED_DOC_IDS_FIELD -> {
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
correlatedDocIds.add(xcp.text())
}
}
MONITOR_ID_FIELD -> monitorId = xcp.text()
MONITOR_NAME_FIELD -> monitorName = xcp.text()
INDEX_FIELD -> index = xcp.text()
Expand All @@ -123,6 +136,7 @@ class Finding(
return Finding(
id = id,
relatedDocIds = relatedDocIds,
correlatedDocIds = correlatedDocIds,
monitorId = monitorId,
monitorName = monitorName,
index = index,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ internal class GetFindingsResponseTests {
val finding1 = Finding(
"1",
listOf("doc1", "doc2", "doc3"),
listOf("doc1", "doc2", "doc3"),
"monitor_id1",
"monitor_name1",
"test_index1",
Expand All @@ -39,6 +40,7 @@ internal class GetFindingsResponseTests {
val finding2 = Finding(
"1",
listOf("doc21", "doc22"),
listOf("doc21", "doc22"),
"monitor_id2",
"monitor_name2",
"test_index2",
Expand Down