diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java index 3e268499b6304..a8c7358bf9415 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfig.java @@ -214,10 +214,10 @@ public class ElasticSearchConfig implements Serializable { @FieldDoc( required = false, - defaultValue = "5", - help = "Idle connection timeout to prevent a read timeout." + defaultValue = "30000", + help = "Idle connection timeout to prevent a connection timeout due to inactivity." ) - private int connectionIdleTimeoutInMs = 5; + private int connectionIdleTimeoutInMs = 30000; @FieldDoc( required = false, diff --git a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java index a81a229607b55..d22d42c532516 100644 --- a/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java +++ b/pulsar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/client/RestClient.java @@ -83,7 +83,6 @@ public RestClient(ElasticSearchConfig elasticSearchConfig, BulkProcessor.Listene // idle+expired connection evictor thread this.executorService = Executors.newSingleThreadScheduledExecutor(); this.executorService.scheduleAtFixedRate(() -> { - configCallback.connectionManager.closeExpiredConnections(); configCallback.connectionManager.closeIdleConnections( config.getConnectionIdleTimeoutInMs(), TimeUnit.MILLISECONDS); }, diff --git a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java index 7d85c027c48c2..85e30e766f030 100644 --- a/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java +++ b/pulsar-io/elastic-search/src/test/java/org/apache/pulsar/io/elasticsearch/ElasticSearchConfigTests.java @@ -92,7 +92,7 @@ public final void defaultValueTest() throws IOException { assertEquals(config.isCompressionEnabled(), false); assertEquals(config.getConnectTimeoutInMs(), 5000L); assertEquals(config.getConnectionRequestTimeoutInMs(), 1000L); - assertEquals(config.getConnectionIdleTimeoutInMs(), 5L); + assertEquals(config.getConnectionIdleTimeoutInMs(), 30000L); assertEquals(config.getSocketTimeoutInMs(), 60000); assertEquals(config.isStripNulls(), true);