diff --git a/notifications/core/src/main/kotlin/org/opensearch/notifications/core/client/DestinationHttpClient.kt b/notifications/core/src/main/kotlin/org/opensearch/notifications/core/client/DestinationHttpClient.kt index e186eefe..b51d9cc7 100644 --- a/notifications/core/src/main/kotlin/org/opensearch/notifications/core/client/DestinationHttpClient.kt +++ b/notifications/core/src/main/kotlin/org/opensearch/notifications/core/client/DestinationHttpClient.kt @@ -15,6 +15,7 @@ import org.apache.hc.client5.http.impl.classic.CloseableHttpClient import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse import org.apache.hc.client5.http.impl.classic.HttpClientBuilder import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager +import org.apache.hc.client5.http.io.HttpClientConnectionManager import org.apache.hc.core5.http.HttpEntity import org.apache.hc.core5.http.HttpResponse import org.apache.hc.core5.http.io.entity.EntityUtils @@ -45,13 +46,21 @@ import kotlin.collections.HashSet class DestinationHttpClient { private val httpClient: CloseableHttpClient + private val httpClientDisableCookie: CloseableHttpClient constructor() { - this.httpClient = createHttpClient() + val connectionManager = PoolingHttpClientConnectionManager() + connectionManager.maxTotal = PluginSettings.maxConnections + connectionManager.defaultMaxPerRoute = PluginSettings.maxConnectionsPerRoute + + // share same connection pool + this.httpClient = createHttpClient(connectionManager, false) + this.httpClientDisableCookie = createHttpClient(connectionManager, true) } @OpenForTesting constructor(httpClient: CloseableHttpClient) { this.httpClient = httpClient + this.httpClientDisableCookie = httpClient } companion object { @@ -70,23 +79,24 @@ class DestinationHttpClient { ) ) - private fun createHttpClient(): CloseableHttpClient { + private fun createHttpClient(connectionManager: HttpClientConnectionManager, disableCookie: Boolean): CloseableHttpClient { val config: RequestConfig = RequestConfig.custom() .setConnectTimeout(Timeout.ofMilliseconds(PluginSettings.connectionTimeout.toLong())) .setConnectionRequestTimeout(Timeout.ofMilliseconds(PluginSettings.connectionTimeout.toLong())) .setResponseTimeout(Timeout.ofMilliseconds(PluginSettings.socketTimeout.toLong())) .build() - val connectionManager = PoolingHttpClientConnectionManager() - connectionManager.maxTotal = PluginSettings.maxConnections - connectionManager.defaultMaxPerRoute = PluginSettings.maxConnectionsPerRoute - return HttpClientBuilder.create() + val builder = HttpClientBuilder.create() .setDefaultRequestConfig(config) .setConnectionManager(connectionManager) .setRetryStrategy(DefaultHttpRequestRetryStrategy()) .useSystemProperties() .disableRedirectHandling() - .build() + + if (disableCookie) { + builder.disableCookieManagement() + } + return builder.build() } } @@ -125,7 +135,11 @@ class DestinationHttpClient { val entity = StringEntity(buildRequestBody(destination, message), StandardCharsets.UTF_8) httpRequest.entity = entity - return httpClient.execute(httpRequest) + return if (PluginSettings.disableHttpCookie) { + httpClientDisableCookie.execute(httpRequest) + } else { + httpClient.execute(httpRequest) + } } private fun constructHttpRequest(method: String, url: String): HttpUriRequestBase { diff --git a/notifications/core/src/main/kotlin/org/opensearch/notifications/core/setting/PluginSettings.kt b/notifications/core/src/main/kotlin/org/opensearch/notifications/core/setting/PluginSettings.kt index 75bbeb1b..74a94d77 100644 --- a/notifications/core/src/main/kotlin/org/opensearch/notifications/core/setting/PluginSettings.kt +++ b/notifications/core/src/main/kotlin/org/opensearch/notifications/core/setting/PluginSettings.kt @@ -35,6 +35,8 @@ internal object PluginSettings { */ private const val EMAIL_KEY_PREFIX = "$KEY_PREFIX.email" + private const val WEBHOOK_KEY_PREFIX = "$KEY_PREFIX.webhook" + /** * Legacy Email Destination setting prefix used by Alerting. * Defining this here to be used as a fallback for the Notification plugin setting to account for migrated Email Destinations. @@ -66,6 +68,11 @@ internal object PluginSettings { */ private const val MAX_CONNECTIONS_KEY = "$HTTP_CONNECTION_KEY_PREFIX.max_connections" + /** + * Settings key for disable http cookie for webhook + */ + private const val DISABLE_HTTP_COOKIE_KEY = "$WEBHOOK_KEY_PREFIX.disable_http_cookie" + /** * Settings Key prefix for max http connection per route. */ @@ -241,6 +248,12 @@ internal object PluginSettings { @Volatile var clusterName: String + /** + * flag to enable/disable cookie management for http connections of webhooks + */ + @Volatile + var disableHttpCookie: Boolean + /** * Destination Settings */ @@ -278,6 +291,7 @@ internal object PluginSettings { hostDenyList = settings?.getAsList(HOST_DENY_LIST_KEY, null) ?: DEFAULT_HOST_DENY_LIST clusterName = settings?.get(CLUSTER_NAME, DEFAULT_CLUSTER_NAME) ?: DEFAULT_CLUSTER_NAME destinationSettings = if (settings != null) loadDestinationSettings(settings) else DEFAULT_DESTINATION_SETTINGS + disableHttpCookie = settings?.getAsBoolean(DISABLE_HTTP_COOKIE_KEY, false) ?: false defaultSettings = mapOf( EMAIL_SIZE_LIMIT_KEY to emailSizeLimit.toString(DECIMAL_RADIX), @@ -309,6 +323,12 @@ internal object PluginSettings { NodeScope, Dynamic ) + val DISABLE_HTTP_COOKIE: Setting = Setting.boolSetting( + DISABLE_HTTP_COOKIE_KEY, + false, + NodeScope, Dynamic + ) + val MAX_CONNECTIONS_PER_ROUTE: Setting = Setting.intSetting( MAX_CONNECTIONS_PER_ROUTE_KEY, defaultSettings[MAX_CONNECTIONS_PER_ROUTE_KEY]!!.toInt(), @@ -412,7 +432,8 @@ internal object PluginSettings { TOOLTIP_SUPPORT, HOST_DENY_LIST, EMAIL_USERNAME, - EMAIL_PASSWORD + EMAIL_PASSWORD, + DISABLE_HTTP_COOKIE, ) } /** @@ -431,6 +452,7 @@ internal object PluginSettings { hostDenyList = HOST_DENY_LIST.get(clusterService.settings) destinationSettings = loadDestinationSettings(clusterService.settings) clusterName = clusterService.clusterName.value() + disableHttpCookie = DISABLE_HTTP_COOKIE.get(clusterService.settings) } /** @@ -454,6 +476,13 @@ internal object PluginSettings { log.debug("$LOG_PREFIX:$MAX_CONNECTIONS_KEY -autoUpdatedTo-> $clusterMaxConnections") maxConnections = clusterMaxConnections } + + val disableWebhookHttpCookie = clusterService.clusterSettings.get(DISABLE_HTTP_COOKIE) + if (disableWebhookHttpCookie != null) { + log.debug("$LOG_PREFIX:$DISABLE_HTTP_COOKIE -autoUpdatedTo-> $disableWebhookHttpCookie") + this.disableHttpCookie = disableWebhookHttpCookie + } + val clusterMaxConnectionsPerRoute = clusterService.clusterSettings.get(MAX_CONNECTIONS_PER_ROUTE) if (clusterMaxConnectionsPerRoute != null) { log.debug("$LOG_PREFIX:$MAX_CONNECTIONS_PER_ROUTE_KEY -autoUpdatedTo-> $clusterMaxConnectionsPerRoute") @@ -517,6 +546,10 @@ internal object PluginSettings { maxConnections = it log.info("$LOG_PREFIX:$MAX_CONNECTIONS_KEY -updatedTo-> $it") } + clusterService.clusterSettings.addSettingsUpdateConsumer(DISABLE_HTTP_COOKIE) { + disableHttpCookie = it + log.info("$LOG_PREFIX:${this.DISABLE_HTTP_COOKIE_KEY} -updatedTo-> $it") + } clusterService.clusterSettings.addSettingsUpdateConsumer(MAX_CONNECTIONS_PER_ROUTE) { maxConnectionsPerRoute = it log.info("$LOG_PREFIX:$MAX_CONNECTIONS_PER_ROUTE_KEY -updatedTo-> $it") @@ -595,5 +628,6 @@ internal object PluginSettings { allowedConfigTypes = DEFAULT_ALLOWED_CONFIG_TYPES tooltipSupport = DEFAULT_TOOLTIP_SUPPORT hostDenyList = DEFAULT_HOST_DENY_LIST + disableHttpCookie = false } } diff --git a/notifications/core/src/test/kotlin/org/opensearch/notifications/core/settings/PluginSettingsTests.kt b/notifications/core/src/test/kotlin/org/opensearch/notifications/core/settings/PluginSettingsTests.kt index 147516d9..d2984781 100644 --- a/notifications/core/src/test/kotlin/org/opensearch/notifications/core/settings/PluginSettingsTests.kt +++ b/notifications/core/src/test/kotlin/org/opensearch/notifications/core/settings/PluginSettingsTests.kt @@ -31,6 +31,7 @@ internal class PluginSettingsTests { private val httpMaxConnectionKey = "$httpKeyPrefix.max_connections" private val httpMaxConnectionPerRouteKey = "$httpKeyPrefix.max_connection_per_route" private val httpConnectionTimeoutKey = "$httpKeyPrefix.connection_timeout" + private val disableWebhookCookieKey = "$keyPrefix.webhook.disable_http_cookie" private val httpSocketTimeoutKey = "$httpKeyPrefix.socket_timeout" private val legacyAlertingHostDenyListKey = "opendistro.destination.host.deny_list" private val alertingHostDenyListKey = "plugins.destination.host.deny_list" @@ -93,7 +94,8 @@ internal class PluginSettingsTests { PluginSettings.SOCKET_TIMEOUT_MILLISECONDS, PluginSettings.ALLOWED_CONFIG_TYPES, PluginSettings.TOOLTIP_SUPPORT, - PluginSettings.HOST_DENY_LIST + PluginSettings.HOST_DENY_LIST, + PluginSettings.DISABLE_HTTP_COOKIE, ) ) ) @@ -134,6 +136,10 @@ internal class PluginSettingsTests { "opensearch", PluginSettings.clusterName ) + Assertions.assertEquals( + false, + PluginSettings.disableHttpCookie + ) } @Test @@ -148,6 +154,7 @@ internal class PluginSettingsTests { .putList(httpHostDenyListKey, listOf("sample")) .putList(allowedConfigTypeKey, listOf("slack")) .put(tooltipSupportKey, false) + .put(disableWebhookCookieKey, true) .put(clusterNameKey, "OpenSearch OsDomainNameUpdate") .build() @@ -166,6 +173,7 @@ internal class PluginSettingsTests { PluginSettings.ALLOWED_CONFIG_TYPES, PluginSettings.TOOLTIP_SUPPORT, PluginSettings.HOST_DENY_LIST, + PluginSettings.DISABLE_HTTP_COOKIE, ClusterName.CLUSTER_NAME_SETTING ) ) @@ -207,6 +215,10 @@ internal class PluginSettingsTests { "OpenSearch OsDomainNameUpdate", clusterService.clusterSettings.get(ClusterName.CLUSTER_NAME_SETTING).value() ) + Assertions.assertEquals( + true, + clusterService.clusterSettings.get(PluginSettings.DISABLE_HTTP_COOKIE) + ) } @Test @@ -227,6 +239,7 @@ internal class PluginSettingsTests { PluginSettings.ALLOWED_CONFIG_TYPES, PluginSettings.TOOLTIP_SUPPORT, PluginSettings.HOST_DENY_LIST, + PluginSettings.DISABLE_HTTP_COOKIE, ClusterName.CLUSTER_NAME_SETTING ) ) @@ -268,6 +281,10 @@ internal class PluginSettingsTests { "Cluster [opensearch]", clusterService.clusterSettings.get(ClusterName.CLUSTER_NAME_SETTING).toString() ) + Assertions.assertEquals( + false, + clusterService.clusterSettings.get(PluginSettings.DISABLE_HTTP_COOKIE) + ) } @Test @@ -295,6 +312,7 @@ internal class PluginSettingsTests { PluginSettings.LEGACY_ALERTING_HOST_DENY_LIST, PluginSettings.ALERTING_HOST_DENY_LIST, PluginSettings.HOST_DENY_LIST, + PluginSettings.DISABLE_HTTP_COOKIE, ClusterName.CLUSTER_NAME_SETTING ) ) @@ -330,6 +348,7 @@ internal class PluginSettingsTests { PluginSettings.LEGACY_ALERTING_HOST_DENY_LIST, PluginSettings.ALERTING_HOST_DENY_LIST, PluginSettings.HOST_DENY_LIST, + PluginSettings.DISABLE_HTTP_COOKIE, ClusterName.CLUSTER_NAME_SETTING ) ) @@ -364,6 +383,7 @@ internal class PluginSettingsTests { PluginSettings.LEGACY_ALERTING_HOST_DENY_LIST, PluginSettings.ALERTING_HOST_DENY_LIST, PluginSettings.HOST_DENY_LIST, + PluginSettings.DISABLE_HTTP_COOKIE, ClusterName.CLUSTER_NAME_SETTING ) ) diff --git a/notifications/notifications/build.gradle b/notifications/notifications/build.gradle index 14f686ce..dd0b00ef 100644 --- a/notifications/notifications/build.gradle +++ b/notifications/notifications/build.gradle @@ -233,6 +233,7 @@ integTest { if (usingRemoteCluster) { filter { excludeTestsMatching "org.opensearch.integtest.send.SendTestMessageWithMockServerIT" + excludeTestsMatching "org.opensearch.integtest.send.SendWithMockServerCookieIT" } } } diff --git a/notifications/notifications/src/test/kotlin/org/opensearch/integtest/send/SendWithMockServerCookieIT.kt b/notifications/notifications/src/test/kotlin/org/opensearch/integtest/send/SendWithMockServerCookieIT.kt new file mode 100644 index 00000000..a3373abf --- /dev/null +++ b/notifications/notifications/src/test/kotlin/org/opensearch/integtest/send/SendWithMockServerCookieIT.kt @@ -0,0 +1,170 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.integtest.send + +import com.google.gson.JsonArray +import com.google.gson.JsonObject +import com.sun.net.httpserver.HttpServer +import org.junit.AfterClass +import org.junit.Assert +import org.junit.BeforeClass +import org.opensearch.core.rest.RestStatus +import org.opensearch.integtest.PluginRestTestCase +import org.opensearch.integtest.jsonify +import org.opensearch.notifications.NotificationPlugin.Companion.PLUGIN_BASE_URI +import org.opensearch.rest.RestRequest +import java.net.InetAddress +import java.net.InetSocketAddress + +internal class SendWithMockServerCookieIT : PluginRestTestCase() { + + fun `test webhook return with cookie set`() { + val url = "http://${server.address.hostString}:${server.address.port}/webhook" + logger.info("webhook url = {}", url) + // Create webhook notification config + val createRequestJsonString = """ + { + "config":{ + "name":"test-webhook", + "description":"this is a sample config description", + "config_type":"webhook", + "is_enabled":true, + "webhook":{ + "url":"$url", + "header_params": { + "Content-type": "text/plain" + } + } + } + } + """.trimIndent() + val configId = createConfigWithRequestJsonString(createRequestJsonString) + Assert.assertNotNull(configId) + Thread.sleep(1000) + + // send test message + var sendResponse = executeRequest( + RestRequest.Method.POST.name, "$PLUGIN_BASE_URI/feature/test/$configId", "", RestStatus.OK.status + ) + + logger.info("sendResponse1={}", sendResponse) + + var deliveryStatus = (sendResponse.get("status_list") as JsonArray).get(0).asJsonObject + .get("delivery_status") as JsonObject + Assert.assertEquals(deliveryStatus.get("status_code").asString, "200") + + // send test message again with cookie set + sendResponse = executeRequest( + RestRequest.Method.POST.name, + "$PLUGIN_BASE_URI/feature/test/$configId", "", RestStatus.INTERNAL_SERVER_ERROR.status + ) + + logger.info("sendResponse2={}", sendResponse) + + val realResponse = sendResponse.get("error").asJsonObject.get("reason").asString + deliveryStatus = jsonify(realResponse).get("event_status_list").asJsonArray.get(0).asJsonObject + .get("delivery_status").asJsonObject + Assert.assertEquals( + deliveryStatus.get("status_text").asString, + "Failed to send webhook message Failed: Unauthorized" + ) + } + + fun `test webhook return with cookie disabled`() { + // update settings + executeRequest( + RestRequest.Method.PUT.name, "/_cluster/settings", + """ + { + "transient": { + "opensearch.notifications.core.webhook.disable_http_cookie": true + } + } + """.trimIndent(), + RestStatus.OK.status, + adminClient() + ) + + val url = "http://${server.address.hostString}:${server.address.port}/webhook" + logger.info("webhook url = {}", url) + // Create webhook notification config + val createRequestJsonString = """ + { + "config":{ + "name":"test-webhook", + "description":"this is a sample config description", + "config_type":"webhook", + "is_enabled":true, + "webhook":{ + "url":"$url", + "header_params": { + "Content-type": "text/plain" + } + } + } + } + """.trimIndent() + val configId = createConfigWithRequestJsonString(createRequestJsonString) + Assert.assertNotNull(configId) + Thread.sleep(1000) + + // send test message + var sendResponse = executeRequest( + RestRequest.Method.POST.name, "$PLUGIN_BASE_URI/feature/test/$configId", "", RestStatus.OK.status + ) + + logger.info("sendResponse1={}", sendResponse) + + var deliveryStatus = (sendResponse.get("status_list") as JsonArray).get(0).asJsonObject + .get("delivery_status") as JsonObject + Assert.assertEquals(deliveryStatus.get("status_code").asString, "200") + + // send test message again with cookie set + sendResponse = executeRequest( + RestRequest.Method.POST.name, + "$PLUGIN_BASE_URI/feature/test/$configId", "", RestStatus.OK.status + ) + + logger.info("sendResponse2={}", sendResponse) + } + + companion object { + private lateinit var server: HttpServer + + @JvmStatic + @BeforeClass + fun setupWebhook() { + server = HttpServer.create(InetSocketAddress(InetAddress.getLoopbackAddress(), 0), 0) + + server.createContext("/webhook") { + val response = "test response" + val cookies = it.requestHeaders["Cookie"] + if (cookies != null && cookies.size > 0 && cookies[0].contains("sessionId=123456789")) { + it.sendResponseHeaders(401, response.toByteArray().size.toLong()) + it.responseBody.write(response.toByteArray()) + it.close() + } else { + // Set a session cookie + val cookieValue = "sessionId=123456789; HttpOnly;" + // Add the "Set-Cookie" header to the response + it.responseHeaders.add("Set-Cookie", cookieValue) + + // Send a simple response + it.sendResponseHeaders(200, response.toByteArray().size.toLong()) + it.responseBody.write(response.toByteArray()) + it.close() + } + } + server.start() + } + + @JvmStatic + @AfterClass + fun stopMockServer() { + server.stop(1) + } + } +}