From 7e25f011ed4104335be8a83d90ad60cf520c3692 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Wed, 28 Sep 2022 17:44:59 +0530 Subject: [PATCH 1/5] fix: prom sink changed to BasicHttpClientConnectionManager for idle connection eviction --- docs/docs/sinks/prometheus-sink.md | 8 ++++++++ src/main/java/io/odpf/firehose/config/PromSinkConfig.java | 4 ++++ .../io/odpf/firehose/sink/prometheus/PromSinkFactory.java | 8 ++++++-- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/docs/docs/sinks/prometheus-sink.md b/docs/docs/sinks/prometheus-sink.md index 0c9caffb3..d7cba5339 100644 --- a/docs/docs/sinks/prometheus-sink.md +++ b/docs/docs/sinks/prometheus-sink.md @@ -17,6 +17,14 @@ Defines the connection timeout for the request in millis. - Type: `required` - Default value: `10000` +### `SINK_PROM_MAX_CONNECTIONS` + +Defines the maximum number of HTTP connections with Prometheus. + +- Example value: `5` +- Type: `required` +- Default value: `5` + ### `SINK_PROM_RETRY_STATUS_CODE_RANGES` Defines the range of HTTP status codes for which retry will be attempted. diff --git a/src/main/java/io/odpf/firehose/config/PromSinkConfig.java b/src/main/java/io/odpf/firehose/config/PromSinkConfig.java index e62b07a28..50754def1 100644 --- a/src/main/java/io/odpf/firehose/config/PromSinkConfig.java +++ b/src/main/java/io/odpf/firehose/config/PromSinkConfig.java @@ -22,6 +22,10 @@ public interface PromSinkConfig extends AppConfig { @DefaultValue("10000") Integer getSinkPromRequestTimeoutMs(); + @Key("SINK_PROM_MAX_CONNECTIONS") + @DefaultValue("5") + Integer getSinkPromMaxConnections(); + @Key("SINK_PROM_SERVICE_URL") String getSinkPromServiceUrl(); diff --git a/src/main/java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java b/src/main/java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java index 22628d2e0..76e9cd15b 100644 --- a/src/main/java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java +++ b/src/main/java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java @@ -13,7 +13,7 @@ import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.impl.client.HttpClients; -import org.apache.http.impl.conn.BasicHttpClientConnectionManager; +import org.apache.http.impl.conn.PoolingHttpClientConnectionManager; import java.util.Map; @@ -64,10 +64,14 @@ public static AbstractSink create(Map configuration, StatsDRepor * @return CloseableHttpClient */ private static CloseableHttpClient newHttpClient(PromSinkConfig promSinkConfig) { + Integer maxPromConnections = promSinkConfig.getSinkPromMaxConnections(); RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(promSinkConfig.getSinkPromRequestTimeoutMs()) .setConnectionRequestTimeout(promSinkConfig.getSinkPromRequestTimeoutMs()) .setConnectTimeout(promSinkConfig.getSinkPromRequestTimeoutMs()).build(); - BasicHttpClientConnectionManager connectionManager = new BasicHttpClientConnectionManager(); + PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); + connectionManager.setMaxTotal(maxPromConnections); + connectionManager.setDefaultMaxPerRoute(maxPromConnections); + HttpClientBuilder builder = HttpClients.custom().setConnectionManager(connectionManager).setDefaultRequestConfig(requestConfig); return builder.build(); From 0d6a15c19a488d89605e8de47cc9fcbeaf93eefe Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Thu, 29 Sep 2022 07:52:59 +0530 Subject: [PATCH 2/5] fix: update default SINK_PROM_MAX_CONNECTIONS --- docs/docs/sinks/prometheus-sink.md | 4 ++-- src/main/java/io/odpf/firehose/config/PromSinkConfig.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/docs/sinks/prometheus-sink.md b/docs/docs/sinks/prometheus-sink.md index d7cba5339..1ca2a7ba0 100644 --- a/docs/docs/sinks/prometheus-sink.md +++ b/docs/docs/sinks/prometheus-sink.md @@ -21,9 +21,9 @@ Defines the connection timeout for the request in millis. Defines the maximum number of HTTP connections with Prometheus. -- Example value: `5` +- Example value: `10` - Type: `required` -- Default value: `5` +- Default value: `10` ### `SINK_PROM_RETRY_STATUS_CODE_RANGES` diff --git a/src/main/java/io/odpf/firehose/config/PromSinkConfig.java b/src/main/java/io/odpf/firehose/config/PromSinkConfig.java index 50754def1..2dd7b8980 100644 --- a/src/main/java/io/odpf/firehose/config/PromSinkConfig.java +++ b/src/main/java/io/odpf/firehose/config/PromSinkConfig.java @@ -23,7 +23,7 @@ public interface PromSinkConfig extends AppConfig { Integer getSinkPromRequestTimeoutMs(); @Key("SINK_PROM_MAX_CONNECTIONS") - @DefaultValue("5") + @DefaultValue("10") Integer getSinkPromMaxConnections(); @Key("SINK_PROM_SERVICE_URL") From 0ce082d2dbd6031244196940c4e390375e4e98db Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Mon, 10 Oct 2022 09:59:04 +0530 Subject: [PATCH 3/5] fix: changed default value for max prom connection to use default PoolingHttpClientConnectionManager --- docs/docs/sinks/prometheus-sink.md | 3 +-- src/main/java/io/odpf/firehose/config/PromSinkConfig.java | 1 - .../io/odpf/firehose/sink/prometheus/PromSinkFactory.java | 7 ++++--- 3 files changed, 5 insertions(+), 6 deletions(-) diff --git a/docs/docs/sinks/prometheus-sink.md b/docs/docs/sinks/prometheus-sink.md index 1ca2a7ba0..2425b5591 100644 --- a/docs/docs/sinks/prometheus-sink.md +++ b/docs/docs/sinks/prometheus-sink.md @@ -22,8 +22,7 @@ Defines the connection timeout for the request in millis. Defines the maximum number of HTTP connections with Prometheus. - Example value: `10` -- Type: `required` -- Default value: `10` +- Type: `optional` ### `SINK_PROM_RETRY_STATUS_CODE_RANGES` diff --git a/src/main/java/io/odpf/firehose/config/PromSinkConfig.java b/src/main/java/io/odpf/firehose/config/PromSinkConfig.java index 2dd7b8980..8f4bb4bbf 100644 --- a/src/main/java/io/odpf/firehose/config/PromSinkConfig.java +++ b/src/main/java/io/odpf/firehose/config/PromSinkConfig.java @@ -23,7 +23,6 @@ public interface PromSinkConfig extends AppConfig { Integer getSinkPromRequestTimeoutMs(); @Key("SINK_PROM_MAX_CONNECTIONS") - @DefaultValue("10") Integer getSinkPromMaxConnections(); @Key("SINK_PROM_SERVICE_URL") diff --git a/src/main/java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java b/src/main/java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java index 76e9cd15b..af6ac8ff4 100644 --- a/src/main/java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java +++ b/src/main/java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java @@ -64,13 +64,14 @@ public static AbstractSink create(Map configuration, StatsDRepor * @return CloseableHttpClient */ private static CloseableHttpClient newHttpClient(PromSinkConfig promSinkConfig) { - Integer maxPromConnections = promSinkConfig.getSinkPromMaxConnections(); RequestConfig requestConfig = RequestConfig.custom().setSocketTimeout(promSinkConfig.getSinkPromRequestTimeoutMs()) .setConnectionRequestTimeout(promSinkConfig.getSinkPromRequestTimeoutMs()) .setConnectTimeout(promSinkConfig.getSinkPromRequestTimeoutMs()).build(); PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); - connectionManager.setMaxTotal(maxPromConnections); - connectionManager.setDefaultMaxPerRoute(maxPromConnections); + if(promSinkConfig.getSinkPromMaxConnections() != null && promSinkConfig.getSinkPromMaxConnections() > 0){ + connectionManager.setMaxTotal(promSinkConfig.getSinkPromMaxConnections()); + connectionManager.setDefaultMaxPerRoute(promSinkConfig.getSinkPromMaxConnections()); + } HttpClientBuilder builder = HttpClients.custom().setConnectionManager(connectionManager).setDefaultRequestConfig(requestConfig); From 5fbc62344d29fbe11c91f5bbc644e041ab3fe599 Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Mon, 10 Oct 2022 10:09:36 +0530 Subject: [PATCH 4/5] fix: styling --- .../java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java b/src/main/java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java index af6ac8ff4..b9b5ccfd7 100644 --- a/src/main/java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java +++ b/src/main/java/io/odpf/firehose/sink/prometheus/PromSinkFactory.java @@ -68,7 +68,7 @@ private static CloseableHttpClient newHttpClient(PromSinkConfig promSinkConfig) .setConnectionRequestTimeout(promSinkConfig.getSinkPromRequestTimeoutMs()) .setConnectTimeout(promSinkConfig.getSinkPromRequestTimeoutMs()).build(); PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager(); - if(promSinkConfig.getSinkPromMaxConnections() != null && promSinkConfig.getSinkPromMaxConnections() > 0){ + if (promSinkConfig.getSinkPromMaxConnections() != null && promSinkConfig.getSinkPromMaxConnections() > 0) { connectionManager.setMaxTotal(promSinkConfig.getSinkPromMaxConnections()); connectionManager.setDefaultMaxPerRoute(promSinkConfig.getSinkPromMaxConnections()); } From f1460353f93e2e2b730e49c29c8c4593b057753e Mon Sep 17 00:00:00 2001 From: Mayank Rai Date: Mon, 10 Oct 2022 10:16:31 +0530 Subject: [PATCH 5/5] fix: added default value in doc --- docs/docs/sinks/prometheus-sink.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/docs/sinks/prometheus-sink.md b/docs/docs/sinks/prometheus-sink.md index 2425b5591..cf3ba49fa 100644 --- a/docs/docs/sinks/prometheus-sink.md +++ b/docs/docs/sinks/prometheus-sink.md @@ -23,6 +23,7 @@ Defines the maximum number of HTTP connections with Prometheus. - Example value: `10` - Type: `optional` +- Default value: `default no more than 2 concurrent connections per given route and no more 20 connections` ### `SINK_PROM_RETRY_STATUS_CODE_RANGES`