Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Support SNS FIFO queues #722

Merged
merged 1 commit into from
Jul 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@ import com.amazonaws.services.sns.model.KMSOptInRequiredException
import com.amazonaws.services.sns.model.KMSThrottlingException
import com.amazonaws.services.sns.model.NotFoundException
import com.amazonaws.services.sns.model.PlatformApplicationDisabledException
import com.amazonaws.services.sns.model.PublishRequest
import com.amazonaws.services.sns.model.PublishResult
import org.opensearch.core.rest.RestStatus
import org.opensearch.notifications.core.NotificationCorePlugin.Companion.LOG_PREFIX
import org.opensearch.notifications.core.credentials.SnsClientFactory
import org.opensearch.notifications.core.setting.PluginSettings
import org.opensearch.notifications.core.utils.logger
import org.opensearch.notifications.spi.model.DestinationMessageResponse
import org.opensearch.notifications.spi.model.MessageContent
import org.opensearch.notifications.spi.model.destination.SnsDestination
import java.util.UUID

/**
* This class handles the SNS connections to the given Destination.
Expand Down Expand Up @@ -118,6 +121,14 @@ class DestinationSnsClient(private val snsClientFactory: SnsClientFactory) {
*/
@Throws(Exception::class)
fun sendMessage(amazonSNS: AmazonSNS, destination: SnsDestination, message: MessageContent): PublishResult {
return amazonSNS.publish(destination.topicArn, message.textDescription, message.title)
val request: PublishRequest = PublishRequest()
.withTopicArn(destination.topicArn)
.withMessage(message.textDescription)
.withSubject(message.title)
if (destination.topicArn.endsWith(".fifo")) {
request.withMessageDeduplicationId(UUID.randomUUID().toString())
.withMessageGroupId(String.format("opensearch-%s", PluginSettings.clusterName))
}
return amazonSNS.publish(request)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.notifications.core.setting

import org.opensearch.bootstrap.BootstrapInfo
import org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.SecureSetting
import org.opensearch.common.settings.Setting
Expand Down Expand Up @@ -105,6 +106,16 @@ internal object PluginSettings {
*/
private const val TOOLTIP_SUPPORT_KEY = "$KEY_PREFIX.tooltip_support"

/**
* Setting to provide cluster name, which is <AWS-account-number:AWS-domain-name> on the managed service
*/
private const val CLUSTER_NAME = "cluster.name"

/**
* Default cluster name if it cannot be retrieved.
*/
private const val DEFAULT_CLUSTER_NAME = "OpenSearch:DefaultClusterName"

/**
* Default email size limit as 10MB.
*/
Expand Down Expand Up @@ -223,6 +234,12 @@ internal object PluginSettings {
@Volatile
var hostDenyList: List<String>

/**
* cluster name
*/
@Volatile
var clusterName: String

/**
* Destination Settings
*/
Expand Down Expand Up @@ -258,6 +275,7 @@ internal object PluginSettings {
allowedConfigTypes = settings?.getAsList(ALLOWED_CONFIG_TYPE_KEY, null) ?: DEFAULT_ALLOWED_CONFIG_TYPES
tooltipSupport = settings?.getAsBoolean(TOOLTIP_SUPPORT_KEY, true) ?: DEFAULT_TOOLTIP_SUPPORT
hostDenyList = settings?.getAsList(HOST_DENY_LIST_KEY, null) ?: DEFAULT_HOST_DENY_LIST
clusterName = settings?.get(CLUSTER_NAME, DEFAULT_CLUSTER_NAME) ?: DEFAULT_CLUSTER_NAME
destinationSettings = if (settings != null) loadDestinationSettings(settings) else DEFAULT_DESTINATION_SETTINGS

defaultSettings = mapOf(
Expand Down Expand Up @@ -411,6 +429,7 @@ internal object PluginSettings {
tooltipSupport = TOOLTIP_SUPPORT.get(clusterService.settings)
hostDenyList = HOST_DENY_LIST.get(clusterService.settings)
destinationSettings = loadDestinationSettings(clusterService.settings)
clusterName = clusterService.clusterName.value()
}

/**
Expand Down Expand Up @@ -464,6 +483,11 @@ internal object PluginSettings {
log.debug("$LOG_PREFIX:$HOST_DENY_LIST_KEY -autoUpdatedTo-> $clusterHostDenyList")
hostDenyList = clusterHostDenyList
}
val clusterClusterName = clusterService.clusterName
if (clusterClusterName != null) {
log.debug("$LOG_PREFIX:$CLUSTER_NAME_SETTING -autoUpdatedTo-> $clusterClusterName")
clusterName = clusterClusterName.value()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.mockito.Mockito.mock
import org.opensearch.cluster.ClusterName
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.ClusterSettings
import org.opensearch.common.settings.Settings
Expand All @@ -36,6 +37,9 @@ internal class PluginSettingsTests {
private val httpHostDenyListKey = "$httpKeyPrefix.host_deny_list"
private val allowedConfigTypeKey = "$keyPrefix.allowed_config_types"
private val tooltipSupportKey = "$keyPrefix.tooltip_support"
private val clusterNameKey = "cluster.name"

private val defaultClusterName = ClusterName.DEFAULT

private val defaultSettings = Settings.builder()
.put(emailSizeLimitKey, 10000000)
Expand All @@ -59,6 +63,7 @@ internal class PluginSettingsTests {
)
)
.put(tooltipSupportKey, true)
.put(clusterNameKey, "OpenSearch OsDomainName")
.build()

@BeforeEach
Expand Down Expand Up @@ -124,6 +129,10 @@ internal class PluginSettingsTests {
defaultSettings[httpHostDenyListKey],
PluginSettings.hostDenyList.toString()
)
Assertions.assertEquals(
"opensearch",
PluginSettings.clusterName
)
}

@Test
Expand All @@ -138,9 +147,11 @@ internal class PluginSettingsTests {
.putList(httpHostDenyListKey, listOf("sample"))
.putList(allowedConfigTypeKey, listOf("slack"))
.put(tooltipSupportKey, false)
.put(clusterNameKey, "OpenSearch OsDomainNameUpdate")
.build()

whenever(clusterService.settings).thenReturn(defaultSettings)
whenever(clusterService.clusterName).thenReturn(defaultClusterName)
whenever(clusterService.clusterSettings).thenReturn(
ClusterSettings(
clusterSettings,
Expand All @@ -153,7 +164,8 @@ internal class PluginSettingsTests {
PluginSettings.SOCKET_TIMEOUT_MILLISECONDS,
PluginSettings.ALLOWED_CONFIG_TYPES,
PluginSettings.TOOLTIP_SUPPORT,
PluginSettings.HOST_DENY_LIST
PluginSettings.HOST_DENY_LIST,
ClusterName.CLUSTER_NAME_SETTING
)
)
)
Expand Down Expand Up @@ -190,12 +202,17 @@ internal class PluginSettingsTests {
false,
clusterService.clusterSettings.get(PluginSettings.TOOLTIP_SUPPORT)
)
Assertions.assertEquals(
"OpenSearch OsDomainNameUpdate",
clusterService.clusterSettings.get(ClusterName.CLUSTER_NAME_SETTING).value()
)
}

@Test
fun `test update settings should fall back to node settings if cluster settings is not available`() {
val clusterSettings = Settings.builder().build()
whenever(clusterService.settings).thenReturn(defaultSettings)
whenever(clusterService.clusterName).thenReturn(defaultClusterName)
whenever(clusterService.clusterSettings).thenReturn(
ClusterSettings(
clusterSettings,
Expand All @@ -208,7 +225,8 @@ internal class PluginSettingsTests {
PluginSettings.SOCKET_TIMEOUT_MILLISECONDS,
PluginSettings.ALLOWED_CONFIG_TYPES,
PluginSettings.TOOLTIP_SUPPORT,
PluginSettings.HOST_DENY_LIST
PluginSettings.HOST_DENY_LIST,
ClusterName.CLUSTER_NAME_SETTING
)
)
)
Expand Down Expand Up @@ -245,6 +263,10 @@ internal class PluginSettingsTests {
defaultSettings[tooltipSupportKey],
clusterService.clusterSettings.get(PluginSettings.TOOLTIP_SUPPORT).toString()
)
Assertions.assertEquals(
"Cluster [opensearch]",
clusterService.clusterSettings.get(ClusterName.CLUSTER_NAME_SETTING).toString()
)
}

@Test
Expand All @@ -256,6 +278,7 @@ internal class PluginSettingsTests {
.build()

whenever(clusterService.settings).thenReturn(defaultSettings)
whenever(clusterService.clusterName).thenReturn(defaultClusterName)
whenever(clusterService.clusterSettings).thenReturn(
ClusterSettings(
clusterSettings,
Expand All @@ -270,7 +293,8 @@ internal class PluginSettingsTests {
PluginSettings.TOOLTIP_SUPPORT,
PluginSettings.LEGACY_ALERTING_HOST_DENY_LIST,
PluginSettings.ALERTING_HOST_DENY_LIST,
PluginSettings.HOST_DENY_LIST
PluginSettings.HOST_DENY_LIST,
ClusterName.CLUSTER_NAME_SETTING
)
)
)
Expand All @@ -289,6 +313,7 @@ internal class PluginSettingsTests {
.build()

whenever(clusterService.settings).thenReturn(defaultSettings)
whenever(clusterService.clusterName).thenReturn(defaultClusterName)
whenever(clusterService.clusterSettings).thenReturn(
ClusterSettings(
clusterSettings,
Expand All @@ -303,7 +328,8 @@ internal class PluginSettingsTests {
PluginSettings.TOOLTIP_SUPPORT,
PluginSettings.LEGACY_ALERTING_HOST_DENY_LIST,
PluginSettings.ALERTING_HOST_DENY_LIST,
PluginSettings.HOST_DENY_LIST
PluginSettings.HOST_DENY_LIST,
ClusterName.CLUSTER_NAME_SETTING
)
)
)
Expand All @@ -321,6 +347,7 @@ internal class PluginSettingsTests {
.build()

whenever(clusterService.settings).thenReturn(defaultSettings)
whenever(clusterService.clusterName).thenReturn(defaultClusterName)
whenever(clusterService.clusterSettings).thenReturn(
ClusterSettings(
clusterSettings,
Expand All @@ -335,7 +362,8 @@ internal class PluginSettingsTests {
PluginSettings.TOOLTIP_SUPPORT,
PluginSettings.LEGACY_ALERTING_HOST_DENY_LIST,
PluginSettings.ALERTING_HOST_DENY_LIST,
PluginSettings.HOST_DENY_LIST
PluginSettings.HOST_DENY_LIST,
ClusterName.CLUSTER_NAME_SETTING
)
)
)
Expand Down