diff --git a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java index 8d92ae9812d3..7623853d8d6a 100644 --- a/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java +++ b/backends-common/elasticsearch/src/main/java/org/apache/james/backends/es/ClientProviderImpl.java @@ -20,6 +20,7 @@ import java.net.InetAddress; import java.util.Optional; +import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.james.util.Host; import org.elasticsearch.client.Client; @@ -53,11 +54,13 @@ public static ClientProviderImpl fromHosts(ImmutableList hosts, Optional hosts; private final Optional clusterName; + private final ConcurrentLinkedQueue clients; private ClientProviderImpl(ImmutableList hosts, Optional clusterName) { Preconditions.checkArgument(!hosts.isEmpty(), "You should provide at least one host"); this.hosts = hosts; this.clusterName = clusterName; + this.clients = new ConcurrentLinkedQueue<>(); } @@ -72,6 +75,7 @@ public Client get() { InetAddress.getByName(host.getHostName()), host.getPort()))); hosts.forEach(consumer.sneakyThrow()); + clients.add(transportClient); return transportClient; } @@ -83,4 +87,12 @@ public Client get() { } return Settings.EMPTY; } + + public ConcurrentLinkedQueue getClients() { + return clients; + } + + public void clearClients() { + clients.clear(); + } } diff --git a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java index 10e691292e67..6a2473596121 100644 --- a/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java +++ b/backends-common/elasticsearch/src/test/java/org/apache/james/backends/es/DockerElasticSearch.java @@ -22,12 +22,16 @@ import static org.assertj.core.api.Assertions.assertThat; import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; import org.apache.http.HttpStatus; import org.apache.james.util.Host; import org.apache.james.util.docker.DockerGenericContainer; import org.apache.james.util.docker.Images; import org.apache.james.util.docker.RateLimiters; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.lease.Releasable; import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; import com.google.common.collect.ImmutableList; @@ -59,9 +63,14 @@ static ElasticSearchAPI from(Host esHttpHost) { private static final int ES_HTTP_PORT = 9200; private static final int ES_TCP_PORT = 9300; + private static final int MAX_TESTS_PLAYED = 50; + private final DockerGenericContainer eSContainer; + private ClientProviderImpl clientProvider; + private AtomicInteger testPlayedCounter; public DockerElasticSearch() { + this.testPlayedCounter = new AtomicInteger(); this.eSContainer = new DockerGenericContainer(Images.ELASTICSEARCH_2) .withExposedPorts(ES_HTTP_PORT, ES_TCP_PORT) .waitingFor(new HostPortWaitStrategy().withRateLimiter(RateLimiters.TWENTIES_PER_SECOND)); @@ -70,6 +79,7 @@ public DockerElasticSearch() { public void start() { if (!eSContainer.isRunning()) { eSContainer.start(); + clientProvider = createClientProvider(); } } @@ -108,6 +118,17 @@ public void unpause() { public void cleanUpData() { assertThat(esAPI().deleteAllIndexes().status()) .isEqualTo(HttpStatus.SC_OK); + + Stream.of(clientProvider.getClients().toArray(new Client[]{})) + .forEach(Releasable::close); + clientProvider.clearClients(); + + // just assuming this is called after every tests. Actually I'm sure about it. + if (testPlayedCounter.incrementAndGet() >= MAX_TESTS_PLAYED) { + stop(); + start(); + testPlayedCounter.set(0); + } } public void awaitForElasticSearch() { @@ -116,8 +137,11 @@ public void awaitForElasticSearch() { } public ClientProvider clientProvider() { - Optional noClusterName = Optional.empty(); - return ClientProviderImpl.fromHosts(ImmutableList.of(getTcpHost()), noClusterName); + return clientProvider; + } + + private ClientProviderImpl createClientProvider() { + return ClientProviderImpl.fromHosts(ImmutableList.of(getTcpHost()), Optional.empty()); } private ElasticSearchAPI esAPI() {