Skip to content

Commit

Permalink
Support SNS FIFO queues (#716) (#725)
Browse files Browse the repository at this point in the history
* Support SNS FIFO queues



* use clusterService.clusterName when applicable



* fix unit tests



---------

Signed-off-by: Ashish Agrawal <[email protected]>
Co-authored-by: Hailong Cui <[email protected]>
  • Loading branch information
lezzago and Hailong-am authored Jul 27, 2023
1 parent b138592 commit ecdb9a2
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,17 @@ import com.amazonaws.services.sns.model.KMSOptInRequiredException
import com.amazonaws.services.sns.model.KMSThrottlingException
import com.amazonaws.services.sns.model.NotFoundException
import com.amazonaws.services.sns.model.PlatformApplicationDisabledException
import com.amazonaws.services.sns.model.PublishRequest
import com.amazonaws.services.sns.model.PublishResult
import org.opensearch.notifications.core.NotificationCorePlugin.Companion.LOG_PREFIX
import org.opensearch.notifications.core.credentials.SnsClientFactory
import org.opensearch.notifications.core.setting.PluginSettings
import org.opensearch.notifications.core.utils.logger
import org.opensearch.notifications.spi.model.DestinationMessageResponse
import org.opensearch.notifications.spi.model.MessageContent
import org.opensearch.notifications.spi.model.destination.SnsDestination
import org.opensearch.rest.RestStatus
import java.util.UUID

/**
* This class handles the SNS connections to the given Destination.
Expand Down Expand Up @@ -118,6 +121,14 @@ class DestinationSnsClient(private val snsClientFactory: SnsClientFactory) {
*/
@Throws(Exception::class)
fun sendMessage(amazonSNS: AmazonSNS, destination: SnsDestination, message: MessageContent): PublishResult {
return amazonSNS.publish(destination.topicArn, message.textDescription, message.title)
val request: PublishRequest = PublishRequest()
.withTopicArn(destination.topicArn)
.withMessage(message.textDescription)
.withSubject(message.title)
if (destination.topicArn.endsWith(".fifo")) {
request.withMessageDeduplicationId(UUID.randomUUID().toString())
.withMessageGroupId(String.format("opensearch-%s", PluginSettings.clusterName))
}
return amazonSNS.publish(request)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.notifications.core.setting

import org.opensearch.bootstrap.BootstrapInfo
import org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.SecureSetting
import org.opensearch.common.settings.SecureString
Expand Down Expand Up @@ -105,6 +106,16 @@ internal object PluginSettings {
*/
private const val TOOLTIP_SUPPORT_KEY = "$KEY_PREFIX.tooltip_support"

/**
* Setting to provide cluster name, which is <AWS-account-number:AWS-domain-name> on the managed service
*/
private const val CLUSTER_NAME = "cluster.name"

/**
* Default cluster name if it cannot be retrieved.
*/
private const val DEFAULT_CLUSTER_NAME = "OpenSearch:DefaultClusterName"

/**
* Default email size limit as 10MB.
*/
Expand Down Expand Up @@ -223,6 +234,12 @@ internal object PluginSettings {
@Volatile
var hostDenyList: List<String>

/**
* cluster name
*/
@Volatile
var clusterName: String

/**
* Destination Settings
*/
Expand Down Expand Up @@ -258,6 +275,7 @@ internal object PluginSettings {
allowedConfigTypes = settings?.getAsList(ALLOWED_CONFIG_TYPE_KEY, null) ?: DEFAULT_ALLOWED_CONFIG_TYPES
tooltipSupport = settings?.getAsBoolean(TOOLTIP_SUPPORT_KEY, true) ?: DEFAULT_TOOLTIP_SUPPORT
hostDenyList = settings?.getAsList(HOST_DENY_LIST_KEY, null) ?: DEFAULT_HOST_DENY_LIST
clusterName = settings?.get(CLUSTER_NAME, DEFAULT_CLUSTER_NAME) ?: DEFAULT_CLUSTER_NAME
destinationSettings = if (settings != null) loadDestinationSettings(settings) else DEFAULT_DESTINATION_SETTINGS

defaultSettings = mapOf(
Expand Down Expand Up @@ -411,6 +429,7 @@ internal object PluginSettings {
tooltipSupport = TOOLTIP_SUPPORT.get(clusterService.settings)
hostDenyList = HOST_DENY_LIST.get(clusterService.settings)
destinationSettings = loadDestinationSettings(clusterService.settings)
clusterName = clusterService.clusterName.value()
}

/**
Expand Down Expand Up @@ -464,6 +483,11 @@ internal object PluginSettings {
log.debug("$LOG_PREFIX:$HOST_DENY_LIST_KEY -autoUpdatedTo-> $clusterHostDenyList")
hostDenyList = clusterHostDenyList
}
val clusterClusterName = clusterService.clusterName
if (clusterClusterName != null) {
log.debug("$LOG_PREFIX:$CLUSTER_NAME_SETTING -autoUpdatedTo-> $clusterClusterName")
clusterName = clusterClusterName.value()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.mockito.Mockito.mock
import org.opensearch.cluster.ClusterName
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.ClusterSettings
import org.opensearch.common.settings.Settings
Expand All @@ -36,6 +37,9 @@ internal class PluginSettingsTests {
private val httpHostDenyListKey = "$httpKeyPrefix.host_deny_list"
private val allowedConfigTypeKey = "$keyPrefix.allowed_config_types"
private val tooltipSupportKey = "$keyPrefix.tooltip_support"
private val clusterNameKey = "cluster.name"

private val defaultClusterName = ClusterName.DEFAULT

private val defaultSettings = Settings.builder()
.put(emailSizeLimitKey, 10000000)
Expand All @@ -59,6 +63,7 @@ internal class PluginSettingsTests {
)
)
.put(tooltipSupportKey, true)
.put(clusterNameKey, "OpenSearch OsDomainName")
.build()

@BeforeEach
Expand Down Expand Up @@ -124,6 +129,10 @@ internal class PluginSettingsTests {
defaultSettings[httpHostDenyListKey],
PluginSettings.hostDenyList.toString()
)
Assertions.assertEquals(
"opensearch",
PluginSettings.clusterName
)
}

@Test
Expand All @@ -138,9 +147,11 @@ internal class PluginSettingsTests {
.putList(httpHostDenyListKey, listOf("sample"))
.putList(allowedConfigTypeKey, listOf("slack"))
.put(tooltipSupportKey, false)
.put(clusterNameKey, "OpenSearch OsDomainNameUpdate")
.build()

whenever(clusterService.settings).thenReturn(defaultSettings)
whenever(clusterService.clusterName).thenReturn(defaultClusterName)
whenever(clusterService.clusterSettings).thenReturn(
ClusterSettings(
clusterSettings,
Expand All @@ -153,7 +164,8 @@ internal class PluginSettingsTests {
PluginSettings.SOCKET_TIMEOUT_MILLISECONDS,
PluginSettings.ALLOWED_CONFIG_TYPES,
PluginSettings.TOOLTIP_SUPPORT,
PluginSettings.HOST_DENY_LIST
PluginSettings.HOST_DENY_LIST,
ClusterName.CLUSTER_NAME_SETTING
)
)
)
Expand Down Expand Up @@ -190,12 +202,17 @@ internal class PluginSettingsTests {
false,
clusterService.clusterSettings.get(PluginSettings.TOOLTIP_SUPPORT)
)
Assertions.assertEquals(
"OpenSearch OsDomainNameUpdate",
clusterService.clusterSettings.get(ClusterName.CLUSTER_NAME_SETTING).value()
)
}

@Test
fun `test update settings should fall back to node settings if cluster settings is not available`() {
val clusterSettings = Settings.builder().build()
whenever(clusterService.settings).thenReturn(defaultSettings)
whenever(clusterService.clusterName).thenReturn(defaultClusterName)
whenever(clusterService.clusterSettings).thenReturn(
ClusterSettings(
clusterSettings,
Expand All @@ -208,7 +225,8 @@ internal class PluginSettingsTests {
PluginSettings.SOCKET_TIMEOUT_MILLISECONDS,
PluginSettings.ALLOWED_CONFIG_TYPES,
PluginSettings.TOOLTIP_SUPPORT,
PluginSettings.HOST_DENY_LIST
PluginSettings.HOST_DENY_LIST,
ClusterName.CLUSTER_NAME_SETTING
)
)
)
Expand Down Expand Up @@ -245,6 +263,10 @@ internal class PluginSettingsTests {
defaultSettings[tooltipSupportKey],
clusterService.clusterSettings.get(PluginSettings.TOOLTIP_SUPPORT).toString()
)
Assertions.assertEquals(
"Cluster [opensearch]",
clusterService.clusterSettings.get(ClusterName.CLUSTER_NAME_SETTING).toString()
)
}

@Test
Expand All @@ -256,6 +278,7 @@ internal class PluginSettingsTests {
.build()

whenever(clusterService.settings).thenReturn(defaultSettings)
whenever(clusterService.clusterName).thenReturn(defaultClusterName)
whenever(clusterService.clusterSettings).thenReturn(
ClusterSettings(
clusterSettings,
Expand All @@ -270,7 +293,8 @@ internal class PluginSettingsTests {
PluginSettings.TOOLTIP_SUPPORT,
PluginSettings.LEGACY_ALERTING_HOST_DENY_LIST,
PluginSettings.ALERTING_HOST_DENY_LIST,
PluginSettings.HOST_DENY_LIST
PluginSettings.HOST_DENY_LIST,
ClusterName.CLUSTER_NAME_SETTING
)
)
)
Expand All @@ -289,6 +313,7 @@ internal class PluginSettingsTests {
.build()

whenever(clusterService.settings).thenReturn(defaultSettings)
whenever(clusterService.clusterName).thenReturn(defaultClusterName)
whenever(clusterService.clusterSettings).thenReturn(
ClusterSettings(
clusterSettings,
Expand All @@ -303,7 +328,8 @@ internal class PluginSettingsTests {
PluginSettings.TOOLTIP_SUPPORT,
PluginSettings.LEGACY_ALERTING_HOST_DENY_LIST,
PluginSettings.ALERTING_HOST_DENY_LIST,
PluginSettings.HOST_DENY_LIST
PluginSettings.HOST_DENY_LIST,
ClusterName.CLUSTER_NAME_SETTING
)
)
)
Expand All @@ -321,6 +347,7 @@ internal class PluginSettingsTests {
.build()

whenever(clusterService.settings).thenReturn(defaultSettings)
whenever(clusterService.clusterName).thenReturn(defaultClusterName)
whenever(clusterService.clusterSettings).thenReturn(
ClusterSettings(
clusterSettings,
Expand All @@ -335,7 +362,8 @@ internal class PluginSettingsTests {
PluginSettings.TOOLTIP_SUPPORT,
PluginSettings.LEGACY_ALERTING_HOST_DENY_LIST,
PluginSettings.ALERTING_HOST_DENY_LIST,
PluginSettings.HOST_DENY_LIST
PluginSettings.HOST_DENY_LIST,
ClusterName.CLUSTER_NAME_SETTING
)
)
)
Expand Down

0 comments on commit ecdb9a2

Please sign in to comment.