From 74de36015577741e79cbd4a60b3fe51fbf310280 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Wed, 26 Jul 2023 14:12:26 +0800 Subject: [PATCH] Support SNS FIFO queues (#716) (#722) * Support SNS FIFO queues * use clusterService.clusterName when applicable * fix unit tests --------- (cherry picked from commit 58a7578e0ca7a599465f84fd4845f8fcd0a72b8b) Signed-off-by: Ashish Agrawal Signed-off-by: github-actions[bot] Co-authored-by: github-actions[bot] Co-authored-by: Hailong Cui --- .../core/client/DestinationSnsClient.kt | 13 ++++++- .../core/setting/PluginSettings.kt | 24 ++++++++++++ .../core/settings/PluginSettingsTests.kt | 38 ++++++++++++++++--- 3 files changed, 69 insertions(+), 6 deletions(-) diff --git a/notifications/core/src/main/kotlin/org/opensearch/notifications/core/client/DestinationSnsClient.kt b/notifications/core/src/main/kotlin/org/opensearch/notifications/core/client/DestinationSnsClient.kt index 046d6e8f..d49bc9d2 100644 --- a/notifications/core/src/main/kotlin/org/opensearch/notifications/core/client/DestinationSnsClient.kt +++ b/notifications/core/src/main/kotlin/org/opensearch/notifications/core/client/DestinationSnsClient.kt @@ -23,14 +23,17 @@ import com.amazonaws.services.sns.model.KMSOptInRequiredException import com.amazonaws.services.sns.model.KMSThrottlingException import com.amazonaws.services.sns.model.NotFoundException import com.amazonaws.services.sns.model.PlatformApplicationDisabledException +import com.amazonaws.services.sns.model.PublishRequest import com.amazonaws.services.sns.model.PublishResult import org.opensearch.core.rest.RestStatus import org.opensearch.notifications.core.NotificationCorePlugin.Companion.LOG_PREFIX import org.opensearch.notifications.core.credentials.SnsClientFactory +import org.opensearch.notifications.core.setting.PluginSettings import org.opensearch.notifications.core.utils.logger import org.opensearch.notifications.spi.model.DestinationMessageResponse import org.opensearch.notifications.spi.model.MessageContent import org.opensearch.notifications.spi.model.destination.SnsDestination +import java.util.UUID /** * This class handles the SNS connections to the given Destination. @@ -118,6 +121,14 @@ class DestinationSnsClient(private val snsClientFactory: SnsClientFactory) { */ @Throws(Exception::class) fun sendMessage(amazonSNS: AmazonSNS, destination: SnsDestination, message: MessageContent): PublishResult { - return amazonSNS.publish(destination.topicArn, message.textDescription, message.title) + val request: PublishRequest = PublishRequest() + .withTopicArn(destination.topicArn) + .withMessage(message.textDescription) + .withSubject(message.title) + if (destination.topicArn.endsWith(".fifo")) { + request.withMessageDeduplicationId(UUID.randomUUID().toString()) + .withMessageGroupId(String.format("opensearch-%s", PluginSettings.clusterName)) + } + return amazonSNS.publish(request) } } diff --git a/notifications/core/src/main/kotlin/org/opensearch/notifications/core/setting/PluginSettings.kt b/notifications/core/src/main/kotlin/org/opensearch/notifications/core/setting/PluginSettings.kt index 70401dee..d965f7d4 100644 --- a/notifications/core/src/main/kotlin/org/opensearch/notifications/core/setting/PluginSettings.kt +++ b/notifications/core/src/main/kotlin/org/opensearch/notifications/core/setting/PluginSettings.kt @@ -6,6 +6,7 @@ package org.opensearch.notifications.core.setting import org.opensearch.bootstrap.BootstrapInfo +import org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.SecureSetting import org.opensearch.common.settings.Setting @@ -105,6 +106,16 @@ internal object PluginSettings { */ private const val TOOLTIP_SUPPORT_KEY = "$KEY_PREFIX.tooltip_support" + /** + * Setting to provide cluster name, which is on the managed service + */ + private const val CLUSTER_NAME = "cluster.name" + + /** + * Default cluster name if it cannot be retrieved. + */ + private const val DEFAULT_CLUSTER_NAME = "OpenSearch:DefaultClusterName" + /** * Default email size limit as 10MB. */ @@ -223,6 +234,12 @@ internal object PluginSettings { @Volatile var hostDenyList: List + /** + * cluster name + */ + @Volatile + var clusterName: String + /** * Destination Settings */ @@ -258,6 +275,7 @@ internal object PluginSettings { allowedConfigTypes = settings?.getAsList(ALLOWED_CONFIG_TYPE_KEY, null) ?: DEFAULT_ALLOWED_CONFIG_TYPES tooltipSupport = settings?.getAsBoolean(TOOLTIP_SUPPORT_KEY, true) ?: DEFAULT_TOOLTIP_SUPPORT hostDenyList = settings?.getAsList(HOST_DENY_LIST_KEY, null) ?: DEFAULT_HOST_DENY_LIST + clusterName = settings?.get(CLUSTER_NAME, DEFAULT_CLUSTER_NAME) ?: DEFAULT_CLUSTER_NAME destinationSettings = if (settings != null) loadDestinationSettings(settings) else DEFAULT_DESTINATION_SETTINGS defaultSettings = mapOf( @@ -411,6 +429,7 @@ internal object PluginSettings { tooltipSupport = TOOLTIP_SUPPORT.get(clusterService.settings) hostDenyList = HOST_DENY_LIST.get(clusterService.settings) destinationSettings = loadDestinationSettings(clusterService.settings) + clusterName = clusterService.clusterName.value() } /** @@ -464,6 +483,11 @@ internal object PluginSettings { log.debug("$LOG_PREFIX:$HOST_DENY_LIST_KEY -autoUpdatedTo-> $clusterHostDenyList") hostDenyList = clusterHostDenyList } + val clusterClusterName = clusterService.clusterName + if (clusterClusterName != null) { + log.debug("$LOG_PREFIX:$CLUSTER_NAME_SETTING -autoUpdatedTo-> $clusterClusterName") + clusterName = clusterClusterName.value() + } } /** diff --git a/notifications/core/src/test/kotlin/org/opensearch/notifications/core/settings/PluginSettingsTests.kt b/notifications/core/src/test/kotlin/org/opensearch/notifications/core/settings/PluginSettingsTests.kt index f4442bf6..098da401 100644 --- a/notifications/core/src/test/kotlin/org/opensearch/notifications/core/settings/PluginSettingsTests.kt +++ b/notifications/core/src/test/kotlin/org/opensearch/notifications/core/settings/PluginSettingsTests.kt @@ -12,6 +12,7 @@ import org.junit.jupiter.api.Assertions import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test import org.mockito.Mockito.mock +import org.opensearch.cluster.ClusterName import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.ClusterSettings import org.opensearch.common.settings.Settings @@ -36,6 +37,9 @@ internal class PluginSettingsTests { private val httpHostDenyListKey = "$httpKeyPrefix.host_deny_list" private val allowedConfigTypeKey = "$keyPrefix.allowed_config_types" private val tooltipSupportKey = "$keyPrefix.tooltip_support" + private val clusterNameKey = "cluster.name" + + private val defaultClusterName = ClusterName.DEFAULT private val defaultSettings = Settings.builder() .put(emailSizeLimitKey, 10000000) @@ -59,6 +63,7 @@ internal class PluginSettingsTests { ) ) .put(tooltipSupportKey, true) + .put(clusterNameKey, "OpenSearch OsDomainName") .build() @BeforeEach @@ -124,6 +129,10 @@ internal class PluginSettingsTests { defaultSettings[httpHostDenyListKey], PluginSettings.hostDenyList.toString() ) + Assertions.assertEquals( + "opensearch", + PluginSettings.clusterName + ) } @Test @@ -138,9 +147,11 @@ internal class PluginSettingsTests { .putList(httpHostDenyListKey, listOf("sample")) .putList(allowedConfigTypeKey, listOf("slack")) .put(tooltipSupportKey, false) + .put(clusterNameKey, "OpenSearch OsDomainNameUpdate") .build() whenever(clusterService.settings).thenReturn(defaultSettings) + whenever(clusterService.clusterName).thenReturn(defaultClusterName) whenever(clusterService.clusterSettings).thenReturn( ClusterSettings( clusterSettings, @@ -153,7 +164,8 @@ internal class PluginSettingsTests { PluginSettings.SOCKET_TIMEOUT_MILLISECONDS, PluginSettings.ALLOWED_CONFIG_TYPES, PluginSettings.TOOLTIP_SUPPORT, - PluginSettings.HOST_DENY_LIST + PluginSettings.HOST_DENY_LIST, + ClusterName.CLUSTER_NAME_SETTING ) ) ) @@ -190,12 +202,17 @@ internal class PluginSettingsTests { false, clusterService.clusterSettings.get(PluginSettings.TOOLTIP_SUPPORT) ) + Assertions.assertEquals( + "OpenSearch OsDomainNameUpdate", + clusterService.clusterSettings.get(ClusterName.CLUSTER_NAME_SETTING).value() + ) } @Test fun `test update settings should fall back to node settings if cluster settings is not available`() { val clusterSettings = Settings.builder().build() whenever(clusterService.settings).thenReturn(defaultSettings) + whenever(clusterService.clusterName).thenReturn(defaultClusterName) whenever(clusterService.clusterSettings).thenReturn( ClusterSettings( clusterSettings, @@ -208,7 +225,8 @@ internal class PluginSettingsTests { PluginSettings.SOCKET_TIMEOUT_MILLISECONDS, PluginSettings.ALLOWED_CONFIG_TYPES, PluginSettings.TOOLTIP_SUPPORT, - PluginSettings.HOST_DENY_LIST + PluginSettings.HOST_DENY_LIST, + ClusterName.CLUSTER_NAME_SETTING ) ) ) @@ -245,6 +263,10 @@ internal class PluginSettingsTests { defaultSettings[tooltipSupportKey], clusterService.clusterSettings.get(PluginSettings.TOOLTIP_SUPPORT).toString() ) + Assertions.assertEquals( + "Cluster [opensearch]", + clusterService.clusterSettings.get(ClusterName.CLUSTER_NAME_SETTING).toString() + ) } @Test @@ -256,6 +278,7 @@ internal class PluginSettingsTests { .build() whenever(clusterService.settings).thenReturn(defaultSettings) + whenever(clusterService.clusterName).thenReturn(defaultClusterName) whenever(clusterService.clusterSettings).thenReturn( ClusterSettings( clusterSettings, @@ -270,7 +293,8 @@ internal class PluginSettingsTests { PluginSettings.TOOLTIP_SUPPORT, PluginSettings.LEGACY_ALERTING_HOST_DENY_LIST, PluginSettings.ALERTING_HOST_DENY_LIST, - PluginSettings.HOST_DENY_LIST + PluginSettings.HOST_DENY_LIST, + ClusterName.CLUSTER_NAME_SETTING ) ) ) @@ -289,6 +313,7 @@ internal class PluginSettingsTests { .build() whenever(clusterService.settings).thenReturn(defaultSettings) + whenever(clusterService.clusterName).thenReturn(defaultClusterName) whenever(clusterService.clusterSettings).thenReturn( ClusterSettings( clusterSettings, @@ -303,7 +328,8 @@ internal class PluginSettingsTests { PluginSettings.TOOLTIP_SUPPORT, PluginSettings.LEGACY_ALERTING_HOST_DENY_LIST, PluginSettings.ALERTING_HOST_DENY_LIST, - PluginSettings.HOST_DENY_LIST + PluginSettings.HOST_DENY_LIST, + ClusterName.CLUSTER_NAME_SETTING ) ) ) @@ -321,6 +347,7 @@ internal class PluginSettingsTests { .build() whenever(clusterService.settings).thenReturn(defaultSettings) + whenever(clusterService.clusterName).thenReturn(defaultClusterName) whenever(clusterService.clusterSettings).thenReturn( ClusterSettings( clusterSettings, @@ -335,7 +362,8 @@ internal class PluginSettingsTests { PluginSettings.TOOLTIP_SUPPORT, PluginSettings.LEGACY_ALERTING_HOST_DENY_LIST, PluginSettings.ALERTING_HOST_DENY_LIST, - PluginSettings.HOST_DENY_LIST + PluginSettings.HOST_DENY_LIST, + ClusterName.CLUSTER_NAME_SETTING ) ) )