diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java index 25c06880e158e..a4cce0877c762 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchClient.java @@ -193,7 +193,6 @@ public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) { // idle+expired connection evictor thread this.executorService = Executors.newSingleThreadScheduledExecutor(); this.executorService.scheduleAtFixedRate(() -> { - configCallback.connectionManager.closeExpiredConnections(); configCallback.connectionManager.closeIdleConnections( config.getConnectionIdleTimeoutInMs(), TimeUnit.MILLISECONDS); }, diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java index 44cdee9553a18..95ac9ffdc8e2c 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java @@ -196,10 +196,10 @@ public class ElasticSearchConfig implements Serializable { @FieldDoc( required = false, - defaultValue = "5", - help = "Idle connection timeout to prevent a read timeout." + defaultValue = "30000", + help = "Idle connection timeout to prevent a connection timeout due to inactivity." ) - private int connectionIdleTimeoutInMs = 5; + private int connectionIdleTimeoutInMs = 30000; @FieldDoc( required = false, diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java index 2270349ee8352..c675f3ace3429 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java @@ -86,7 +86,7 @@ public final void defaultValueTest() throws IOException { assertEquals(config.isCompressionEnabled(), false); assertEquals(config.getConnectTimeoutInMs(), 5000L); assertEquals(config.getConnectionRequestTimeoutInMs(), 1000L); - assertEquals(config.getConnectionIdleTimeoutInMs(), 5L); + assertEquals(config.getConnectionIdleTimeoutInMs(), 30000L); assertEquals(config.getSocketTimeoutInMs(), 60000); assertEquals(config.isStripNulls(), true);