diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java index 33171e18e743d..2256274f01a93 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java @@ -61,6 +61,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Locale; import java.util.Map; @@ -132,7 +133,7 @@ public synchronized void setHosts(HttpHost... hosts) { if (hosts == null || hosts.length == 0) { throw new IllegalArgumentException("hosts must not be null nor empty"); } - Set httpHosts = new HashSet<>(); + Set httpHosts = new LinkedHashSet<>(); AuthCache authCache = new BasicAuthCache(); for (HttpHost host : hosts) { Objects.requireNonNull(host, "host cannot be null"); @@ -143,6 +144,13 @@ public synchronized void setHosts(HttpHost... hosts) { this.blacklist.clear(); } + /** + * Returns the configured hosts + */ + public List getHosts() { + return new ArrayList<>(hostTuple.hosts); + } + /** * Sends a request to the Elasticsearch cluster that the client points to. * Blocks until the request is completed and returns its response or fails diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java index 15fa5c0f99596..5fe5fcae78fee 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.net.URI; +import java.util.Arrays; import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -251,6 +252,37 @@ public void testSetHostsWrongArguments() throws IOException { } } + public void testSetHostsPreservesOrdering() throws Exception { + try (RestClient restClient = createRestClient()) { + HttpHost[] hosts = randomHosts(); + restClient.setHosts(hosts); + assertEquals(Arrays.asList(hosts), restClient.getHosts()); + } + } + + private static HttpHost[] randomHosts() { + int numHosts = randomIntBetween(1, 10); + HttpHost[] hosts = new HttpHost[numHosts]; + for (int i = 0; i < hosts.length; i++) { + hosts[i] = new HttpHost("host-" + i, 9200); + } + return hosts; + } + + public void testSetHostsDuplicatedHosts() throws Exception { + try (RestClient restClient = createRestClient()) { + int numHosts = randomIntBetween(1, 10); + HttpHost[] hosts = new HttpHost[numHosts]; + HttpHost host = new HttpHost("host", 9200); + for (int i = 0; i < hosts.length; i++) { + hosts[i] = host; + } + restClient.setHosts(hosts); + assertEquals(1, restClient.getHosts().size()); + assertEquals(host, restClient.getHosts().get(0)); + } + } + /** * @deprecated will remove method in 7.0 but needs tests until then. Replaced by {@link RequestTests#testConstructor()}. */ diff --git a/client/sniffer/src/main/java/org/elasticsearch/client/sniff/SniffOnFailureListener.java b/client/sniffer/src/main/java/org/elasticsearch/client/sniff/SniffOnFailureListener.java index cbc77351de98b..41051555bae2c 100644 --- a/client/sniffer/src/main/java/org/elasticsearch/client/sniff/SniffOnFailureListener.java +++ b/client/sniffer/src/main/java/org/elasticsearch/client/sniff/SniffOnFailureListener.java @@ -58,7 +58,6 @@ public void onFailure(HttpHost host) { if (sniffer == null) { throw new IllegalStateException("sniffer was not set, unable to sniff on failure"); } - //re-sniff immediately but take out the node that failed - sniffer.sniffOnFailure(host); + sniffer.sniffOnFailure(); } } diff --git a/client/sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java b/client/sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java index c655babd9ed3d..dc873ccd44e10 100644 --- a/client/sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java +++ b/client/sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java @@ -31,12 +31,14 @@ import java.security.PrivilegedAction; import java.util.List; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; /** * Class responsible for sniffing nodes from some source (default is elasticsearch itself) and setting them to a provided instance of @@ -51,101 +53,175 @@ public class Sniffer implements Closeable { private static final Log logger = LogFactory.getLog(Sniffer.class); private static final String SNIFFER_THREAD_NAME = "es_rest_client_sniffer"; - private final Task task; + private final HostsSniffer hostsSniffer; + private final RestClient restClient; + private final long sniffIntervalMillis; + private final long sniffAfterFailureDelayMillis; + private final Scheduler scheduler; + private final AtomicBoolean initialized = new AtomicBoolean(false); + private volatile ScheduledTask nextScheduledTask; Sniffer(RestClient restClient, HostsSniffer hostsSniffer, long sniffInterval, long sniffAfterFailureDelay) { - this.task = new Task(hostsSniffer, restClient, sniffInterval, sniffAfterFailureDelay); + this(restClient, hostsSniffer, new DefaultScheduler(), sniffInterval, sniffAfterFailureDelay); + } + + Sniffer(RestClient restClient, HostsSniffer hostsSniffer, Scheduler scheduler, long sniffInterval, long sniffAfterFailureDelay) { + this.hostsSniffer = hostsSniffer; + this.restClient = restClient; + this.sniffIntervalMillis = sniffInterval; + this.sniffAfterFailureDelayMillis = sniffAfterFailureDelay; + this.scheduler = scheduler; + /* + * The first sniffing round is async, so this constructor returns before nextScheduledTask is assigned to a task. + * The initialized flag is a protection against NPE due to that. + */ + Task task = new Task(sniffIntervalMillis) { + @Override + public void run() { + super.run(); + initialized.compareAndSet(false, true); + } + }; + /* + * We do not keep track of the returned future as we never intend to cancel the initial sniffing round, we rather + * prevent any other operation from being executed till the sniffer is properly initialized + */ + scheduler.schedule(task, 0L); } /** - * Triggers a new sniffing round and explicitly takes out the failed host provided as argument + * Schedule sniffing to run as soon as possible if it isn't already running. Once such sniffing round runs + * it will also schedule a new round after sniffAfterFailureDelay ms. */ - public void sniffOnFailure(HttpHost failedHost) { - this.task.sniffOnFailure(failedHost); + public void sniffOnFailure() { + //sniffOnFailure does nothing until the initial sniffing round has been completed + if (initialized.get()) { + /* + * If sniffing is already running, there is no point in scheduling another round right after the current one. + * Concurrent calls may be checking the same task state, but only the first skip call on the same task returns true. + * The task may also get replaced while we check its state, in which case calling skip on it returns false. + */ + if (this.nextScheduledTask.skip()) { + /* + * We do not keep track of this future as the task will immediately run and we don't intend to cancel it + * due to concurrent sniffOnFailure runs. Effectively the previous (now cancelled or skipped) task will stay + * assigned to nextTask till this onFailure round gets run and schedules its corresponding afterFailure round. + */ + scheduler.schedule(new Task(sniffAfterFailureDelayMillis), 0L); + } + } } - @Override - public void close() throws IOException { - task.shutdown(); + enum TaskState { + WAITING, SKIPPED, STARTED } - private static class Task implements Runnable { - private final HostsSniffer hostsSniffer; - private final RestClient restClient; - - private final long sniffIntervalMillis; - private final long sniffAfterFailureDelayMillis; - private final ScheduledExecutorService scheduledExecutorService; - private final AtomicBoolean running = new AtomicBoolean(false); - private ScheduledFuture scheduledFuture; - - private Task(HostsSniffer hostsSniffer, RestClient restClient, long sniffIntervalMillis, long sniffAfterFailureDelayMillis) { - this.hostsSniffer = hostsSniffer; - this.restClient = restClient; - this.sniffIntervalMillis = sniffIntervalMillis; - this.sniffAfterFailureDelayMillis = sniffAfterFailureDelayMillis; - SnifferThreadFactory threadFactory = new SnifferThreadFactory(SNIFFER_THREAD_NAME); - this.scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactory); - scheduleNextRun(0); - } - - synchronized void scheduleNextRun(long delayMillis) { - if (scheduledExecutorService.isShutdown() == false) { - try { - if (scheduledFuture != null) { - //regardless of when the next sniff is scheduled, cancel it and schedule a new one with updated delay - this.scheduledFuture.cancel(false); - } - logger.debug("scheduling next sniff in " + delayMillis + " ms"); - this.scheduledFuture = this.scheduledExecutorService.schedule(this, delayMillis, TimeUnit.MILLISECONDS); - } catch(Exception e) { - logger.error("error while scheduling next sniffer task", e); - } - } + class Task implements Runnable { + final long nextTaskDelay; + final AtomicReference taskState = new AtomicReference<>(TaskState.WAITING); + + Task(long nextTaskDelay) { + this.nextTaskDelay = nextTaskDelay; } @Override public void run() { - sniff(null, sniffIntervalMillis); - } - - void sniffOnFailure(HttpHost failedHost) { - sniff(failedHost, sniffAfterFailureDelayMillis); - } - - void sniff(HttpHost excludeHost, long nextSniffDelayMillis) { - if (running.compareAndSet(false, true)) { - try { - List sniffedHosts = hostsSniffer.sniffHosts(); - logger.debug("sniffed hosts: " + sniffedHosts); - if (excludeHost != null) { - sniffedHosts.remove(excludeHost); - } - if (sniffedHosts.isEmpty()) { - logger.warn("no hosts to set, hosts will be updated at the next sniffing round"); - } else { - this.restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()])); - } - } catch (Exception e) { - logger.error("error while sniffing nodes", e); - } finally { - scheduleNextRun(nextSniffDelayMillis); - running.set(false); - } + /* + * Skipped or already started tasks do nothing. In most cases tasks will be cancelled and not run, but we want to protect for + * cases where future#cancel returns true yet the task runs. We want to make sure that such tasks do nothing otherwise they will + * schedule another round at the end and so on, leaving us with multiple parallel sniffing "tracks" whish is undesirable. + */ + if (taskState.compareAndSet(TaskState.WAITING, TaskState.STARTED) == false) { + return; } - } - - synchronized void shutdown() { - scheduledExecutorService.shutdown(); try { - if (scheduledExecutorService.awaitTermination(1000, TimeUnit.MILLISECONDS)) { - return; - } - scheduledExecutorService.shutdownNow(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + sniff(); + } catch (Exception e) { + logger.error("error while sniffing nodes", e); + } finally { + Task task = new Task(sniffIntervalMillis); + Future future = scheduler.schedule(task, nextTaskDelay); + //tasks are run by a single threaded executor, so swapping is safe with a simple volatile variable + ScheduledTask previousTask = nextScheduledTask; + nextScheduledTask = new ScheduledTask(task, future); + assert initialized.get() == false || + previousTask.task.isSkipped() || previousTask.task.hasStarted() : "task that we are replacing is neither " + + "cancelled nor has it ever started"; } } + + /** + * Returns true if the task has started, false in case it didn't start (yet?) or it was skipped + */ + boolean hasStarted() { + return taskState.get() == TaskState.STARTED; + } + + /** + * Sets this task to be skipped. Returns true if the task will be skipped, false if the task has already started. + */ + boolean skip() { + /* + * Threads may still get run although future#cancel returns true. We make sure that a task is either cancelled (or skipped), + * or entirely run. In the odd case that future#cancel returns true and the thread still runs, the task won't do anything. + * In case future#cancel returns true but the task has already started, this state change will not succeed hence this method + * returns false and the task will normally run. + */ + return taskState.compareAndSet(TaskState.WAITING, TaskState.SKIPPED); + } + + /** + * Returns true if the task was set to be skipped before it was started + */ + boolean isSkipped() { + return taskState.get() == TaskState.SKIPPED; + } + } + + static final class ScheduledTask { + final Task task; + final Future future; + + ScheduledTask(Task task, Future future) { + this.task = task; + this.future = future; + } + + /** + * Cancels this task. Returns true if the task has been successfully cancelled, meaning it won't be executed + * or if it is its execution won't have any effect. Returns false if the task cannot be cancelled (possibly it was + * already cancelled or already completed). + */ + boolean skip() { + /* + * Future#cancel should return false whenever a task cannot be cancelled, most likely as it has already started. We don't + * trust it much though so we try to cancel hoping that it will work. At the same time we always call skip too, which means + * that if the task has already started the state change will fail. We could potentially not call skip when cancel returns + * false but we prefer to stay on the safe side. + */ + future.cancel(false); + return task.skip(); + } + } + + final void sniff() throws IOException { + List sniffedHosts = hostsSniffer.sniffHosts(); + if (logger.isDebugEnabled()) { + logger.debug("sniffed hosts: " + sniffedHosts); + } + if (sniffedHosts.isEmpty()) { + logger.warn("no hosts to set, hosts will be updated at the next sniffing round"); + } else { + restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()])); + } + } + + @Override + public void close() { + if (initialized.get()) { + nextScheduledTask.skip(); + } + this.scheduler.shutdown(); } /** @@ -158,8 +234,62 @@ public static SnifferBuilder builder(RestClient restClient) { return new SnifferBuilder(restClient); } - private static class SnifferThreadFactory implements ThreadFactory { + /** + * The Scheduler interface allows to isolate the sniffing scheduling aspects so that we can test + * the sniffer by injecting when needed a custom scheduler that is more suited for testing. + */ + interface Scheduler { + /** + * Schedules the provided {@link Runnable} to be executed in delayMillis milliseconds + */ + Future schedule(Task task, long delayMillis); + + /** + * Shuts this scheduler down + */ + void shutdown(); + } + + /** + * Default implementation of {@link Scheduler}, based on {@link ScheduledExecutorService} + */ + static final class DefaultScheduler implements Scheduler { + final ScheduledExecutorService executor; + + DefaultScheduler() { + this(initScheduledExecutorService()); + } + + DefaultScheduler(ScheduledExecutorService executor) { + this.executor = executor; + } + + private static ScheduledExecutorService initScheduledExecutorService() { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new SnifferThreadFactory(SNIFFER_THREAD_NAME)); + executor.setRemoveOnCancelPolicy(true); + return executor; + } + + @Override + public Future schedule(Task task, long delayMillis) { + return executor.schedule(task, delayMillis, TimeUnit.MILLISECONDS); + } + + @Override + public void shutdown() { + executor.shutdown(); + try { + if (executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) { + return; + } + executor.shutdownNow(); + } catch (InterruptedException ignore) { + Thread.currentThread().interrupt(); + } + } + } + static class SnifferThreadFactory implements ThreadFactory { private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; private final ThreadFactory originalThreadFactory; diff --git a/client/sniffer/src/test/java/org/elasticsearch/client/sniff/MockHostsSniffer.java b/client/sniffer/src/test/java/org/elasticsearch/client/sniff/MockHostsSniffer.java index 5a52151d76e01..7550459e9ea50 100644 --- a/client/sniffer/src/test/java/org/elasticsearch/client/sniff/MockHostsSniffer.java +++ b/client/sniffer/src/test/java/org/elasticsearch/client/sniff/MockHostsSniffer.java @@ -21,7 +21,6 @@ import org.apache.http.HttpHost; -import java.io.IOException; import java.util.Collections; import java.util.List; @@ -30,7 +29,7 @@ */ class MockHostsSniffer implements HostsSniffer { @Override - public List sniffHosts() throws IOException { + public List sniffHosts() { return Collections.singletonList(new HttpHost("localhost", 9200)); } } diff --git a/client/sniffer/src/test/java/org/elasticsearch/client/sniff/SnifferTests.java b/client/sniffer/src/test/java/org/elasticsearch/client/sniff/SnifferTests.java new file mode 100644 index 0000000000000..8172774a77d80 --- /dev/null +++ b/client/sniffer/src/test/java/org/elasticsearch/client/sniff/SnifferTests.java @@ -0,0 +1,656 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.sniff; + +import org.apache.http.HttpHost; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestClientTestCase; +import org.elasticsearch.client.sniff.Sniffer.DefaultScheduler; +import org.elasticsearch.client.sniff.Sniffer.Scheduler; +import org.mockito.Matchers; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +public class SnifferTests extends RestClientTestCase { + + /** + * Tests the {@link Sniffer#sniff()} method in isolation. Verifies that it uses the {@link HostsSniffer} implementation + * to retrieve nodes and set them (when not empty) to the provided {@link RestClient} instance. + */ + public void testSniff() throws IOException { + HttpHost initialHost = new HttpHost("localhost", 9200); + try (RestClient restClient = RestClient.builder(initialHost).build()) { + Scheduler noOpScheduler = new Scheduler() { + @Override + public Future schedule(Sniffer.Task task, long delayMillis) { + return mock(Future.class); + } + + @Override + public void shutdown() { + + } + }; + CountingHostsSniffer hostsSniffer = new CountingHostsSniffer(); + int iters = randomIntBetween(5, 30); + try (Sniffer sniffer = new Sniffer(restClient, hostsSniffer, noOpScheduler, 1000L, -1)){ + { + assertEquals(1, restClient.getHosts().size()); + HttpHost httpHost = restClient.getHosts().get(0); + assertEquals("localhost", httpHost.getHostName()); + assertEquals(9200, httpHost.getPort()); + } + int emptyList = 0; + int failures = 0; + int runs = 0; + List lastHosts = Collections.singletonList(initialHost); + for (int i = 0; i < iters; i++) { + try { + runs++; + sniffer.sniff(); + if (hostsSniffer.failures.get() > failures) { + failures++; + fail("should have failed given that hostsSniffer says it threw an exception"); + } else if (hostsSniffer.emptyList.get() > emptyList) { + emptyList++; + assertEquals(lastHosts, restClient.getHosts()); + } else { + assertNotEquals(lastHosts, restClient.getHosts()); + List expectedHosts = CountingHostsSniffer.buildHosts(runs); + assertEquals(expectedHosts, restClient.getHosts()); + lastHosts = restClient.getHosts(); + } + } catch(IOException e) { + if (hostsSniffer.failures.get() > failures) { + failures++; + assertEquals("communication breakdown", e.getMessage()); + } + } + } + assertEquals(hostsSniffer.emptyList.get(), emptyList); + assertEquals(hostsSniffer.failures.get(), failures); + assertEquals(hostsSniffer.runs.get(), runs); + } + } + } + + /** + * Test multiple sniffing rounds by mocking the {@link Scheduler} as well as the {@link HostsSniffer}. + * Simulates the ordinary behaviour of {@link Sniffer} when sniffing on failure is not enabled. + * The {@link CountingHostsSniffer} doesn't make any network connection but may throw exception or return no hosts, which makes + * it possible to verify that errors are properly handled and don't affect subsequent runs and their scheduling. + * The {@link Scheduler} implementation submits rather than scheduling tasks, meaning that it doesn't respect the requested sniff + * delays while allowing to assert that the requested delays for each requested run and the following one are the expected values. + */ + public void testOrdinarySniffRounds() throws Exception { + final long sniffInterval = randomLongBetween(1, Long.MAX_VALUE); + long sniffAfterFailureDelay = randomLongBetween(1, Long.MAX_VALUE); + RestClient restClient = mock(RestClient.class); + CountingHostsSniffer hostsSniffer = new CountingHostsSniffer(); + final int iters = randomIntBetween(30, 100); + final Set> futures = new CopyOnWriteArraySet<>(); + final CountDownLatch completionLatch = new CountDownLatch(1); + final AtomicInteger runs = new AtomicInteger(iters); + final ExecutorService executor = Executors.newSingleThreadExecutor(); + final AtomicReference> lastFuture = new AtomicReference<>(); + final AtomicReference lastTask = new AtomicReference<>(); + Scheduler scheduler = new Scheduler() { + @Override + public Future schedule(Sniffer.Task task, long delayMillis) { + assertEquals(sniffInterval, task.nextTaskDelay); + int numberOfRuns = runs.getAndDecrement(); + if (numberOfRuns == iters) { + //the first call is to schedule the first sniff round from the Sniffer constructor, with delay O + assertEquals(0L, delayMillis); + assertEquals(sniffInterval, task.nextTaskDelay); + } else { + //all of the subsequent times "schedule" is called with delay set to the configured sniff interval + assertEquals(sniffInterval, delayMillis); + assertEquals(sniffInterval, task.nextTaskDelay); + if (numberOfRuns == 0) { + completionLatch.countDown(); + return null; + } + } + //we submit rather than scheduling to make the test quick and not depend on time + Future future = executor.submit(task); + futures.add(future); + if (numberOfRuns == 1) { + lastFuture.set(future); + lastTask.set(task); + } + return future; + } + + @Override + public void shutdown() { + //the executor is closed externally, shutdown is tested separately + } + }; + try { + new Sniffer(restClient, hostsSniffer, scheduler, sniffInterval, sniffAfterFailureDelay); + assertTrue("timeout waiting for sniffing rounds to be completed", completionLatch.await(1000, TimeUnit.MILLISECONDS)); + assertEquals(iters, futures.size()); + //the last future is the only one that may not be completed yet, as the count down happens + //while scheduling the next round which is still part of the execution of the runnable itself. + assertTrue(lastTask.get().hasStarted()); + lastFuture.get().get(); + for (Future future : futures) { + assertTrue(future.isDone()); + future.get(); + } + } finally { + executor.shutdown(); + assertTrue(executor.awaitTermination(1000, TimeUnit.MILLISECONDS)); + } + int totalRuns = hostsSniffer.runs.get(); + assertEquals(iters, totalRuns); + int setHostsRuns = totalRuns - hostsSniffer.failures.get() - hostsSniffer.emptyList.get(); + verify(restClient, times(setHostsRuns)).setHosts(Matchers.anyVararg()); + verifyNoMoreInteractions(restClient); + } + + /** + * Test that {@link Sniffer#close()} shuts down the underlying {@link Scheduler}, and that such calls are idempotent. + * Also verifies that the next scheduled round gets cancelled. + */ + public void testClose() { + final Future future = mock(Future.class); + long sniffInterval = randomLongBetween(1, Long.MAX_VALUE); + long sniffAfterFailureDelay = randomLongBetween(1, Long.MAX_VALUE); + RestClient restClient = mock(RestClient.class); + final AtomicInteger shutdown = new AtomicInteger(0); + final AtomicBoolean initialized = new AtomicBoolean(false); + Scheduler scheduler = new Scheduler() { + @Override + public Future schedule(Sniffer.Task task, long delayMillis) { + if (initialized.compareAndSet(false, true)) { + //run from the same thread so the sniffer gets for sure initialized and the scheduled task gets cancelled on close + task.run(); + } + return future; + } + + @Override + public void shutdown() { + shutdown.incrementAndGet(); + } + }; + + Sniffer sniffer = new Sniffer(restClient, new MockHostsSniffer(), scheduler, sniffInterval, sniffAfterFailureDelay); + assertEquals(0, shutdown.get()); + int iters = randomIntBetween(3, 10); + for (int i = 1; i <= iters; i++) { + sniffer.close(); + verify(future, times(i)).cancel(false); + assertEquals(i, shutdown.get()); + } + } + + public void testSniffOnFailureNotInitialized() { + RestClient restClient = mock(RestClient.class); + CountingHostsSniffer hostsSniffer = new CountingHostsSniffer(); + long sniffInterval = randomLongBetween(1, Long.MAX_VALUE); + long sniffAfterFailureDelay = randomLongBetween(1, Long.MAX_VALUE); + final AtomicInteger scheduleCalls = new AtomicInteger(0); + Scheduler scheduler = new Scheduler() { + @Override + public Future schedule(Sniffer.Task task, long delayMillis) { + scheduleCalls.incrementAndGet(); + return null; + } + + @Override + public void shutdown() { + } + }; + + Sniffer sniffer = new Sniffer(restClient, hostsSniffer, scheduler, sniffInterval, sniffAfterFailureDelay); + for (int i = 0; i < 10; i++) { + sniffer.sniffOnFailure(); + } + assertEquals(1, scheduleCalls.get()); + int totalRuns = hostsSniffer.runs.get(); + assertEquals(0, totalRuns); + int setHostsRuns = totalRuns - hostsSniffer.failures.get() - hostsSniffer.emptyList.get(); + verify(restClient, times(setHostsRuns)).setHosts(Matchers.anyVararg()); + verifyNoMoreInteractions(restClient); + } + + /** + * Test behaviour when a bunch of onFailure sniffing rounds are triggered in parallel. Each run will always + * schedule a subsequent afterFailure round. Also, for each onFailure round that starts, the net scheduled round + * (either afterFailure or ordinary) gets cancelled. + */ + public void testSniffOnFailure() throws Exception { + RestClient restClient = mock(RestClient.class); + CountingHostsSniffer hostsSniffer = new CountingHostsSniffer(); + final AtomicBoolean initializing = new AtomicBoolean(true); + final long sniffInterval = randomLongBetween(1, Long.MAX_VALUE); + final long sniffAfterFailureDelay = randomLongBetween(1, Long.MAX_VALUE); + int minNumOnFailureRounds = randomIntBetween(5, 10); + final CountDownLatch initializingLatch = new CountDownLatch(1); + final Set ordinaryRoundsTasks = new CopyOnWriteArraySet<>(); + final AtomicReference> initializingFuture = new AtomicReference<>(); + final Set onFailureTasks = new CopyOnWriteArraySet<>(); + final Set afterFailureTasks = new CopyOnWriteArraySet<>(); + final AtomicBoolean onFailureCompleted = new AtomicBoolean(false); + final CountDownLatch completionLatch = new CountDownLatch(1); + final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + try { + Scheduler scheduler = new Scheduler() { + @Override + public Future schedule(final Sniffer.Task task, long delayMillis) { + if (initializing.compareAndSet(true, false)) { + assertEquals(0L, delayMillis); + Future future = executor.submit(new Runnable() { + @Override + public void run() { + try { + task.run(); + } finally { + //we need to make sure that the sniffer is initialized, so the sniffOnFailure + //call does what it needs to do. Otherwise nothing happens until initialized. + initializingLatch.countDown(); + } + } + }); + assertTrue(initializingFuture.compareAndSet(null, future)); + return future; + } + if (delayMillis == 0L) { + Future future = executor.submit(task); + onFailureTasks.add(new Sniffer.ScheduledTask(task, future)); + return future; + } + if (delayMillis == sniffAfterFailureDelay) { + Future future = scheduleOrSubmit(task); + afterFailureTasks.add(new Sniffer.ScheduledTask(task, future)); + return future; + } + + assertEquals(sniffInterval, delayMillis); + assertEquals(sniffInterval, task.nextTaskDelay); + + if (onFailureCompleted.get() && onFailureTasks.size() == afterFailureTasks.size()) { + completionLatch.countDown(); + return mock(Future.class); + } + + Future future = scheduleOrSubmit(task); + ordinaryRoundsTasks.add(new Sniffer.ScheduledTask(task, future)); + return future; + } + + private Future scheduleOrSubmit(Sniffer.Task task) { + if (randomBoolean()) { + return executor.schedule(task, randomLongBetween(0L, 200L), TimeUnit.MILLISECONDS); + } else { + return executor.submit(task); + } + } + + @Override + public void shutdown() { + } + }; + final Sniffer sniffer = new Sniffer(restClient, hostsSniffer, scheduler, sniffInterval, sniffAfterFailureDelay); + assertTrue("timeout waiting for sniffer to get initialized", initializingLatch.await(1000, TimeUnit.MILLISECONDS)); + + ExecutorService onFailureExecutor = Executors.newFixedThreadPool(randomIntBetween(5, 20)); + Set> onFailureFutures = new CopyOnWriteArraySet<>(); + try { + //with tasks executing quickly one after each other, it is very likely that the onFailure round gets skipped + //as another round is already running. We retry till enough runs get through as that's what we want to test. + while (onFailureTasks.size() < minNumOnFailureRounds) { + onFailureFutures.add(onFailureExecutor.submit(new Runnable() { + @Override + public void run() { + sniffer.sniffOnFailure(); + } + })); + } + assertThat(onFailureFutures.size(), greaterThanOrEqualTo(minNumOnFailureRounds)); + for (Future onFailureFuture : onFailureFutures) { + assertNull(onFailureFuture.get()); + } + onFailureCompleted.set(true); + } finally { + onFailureExecutor.shutdown(); + onFailureExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS); + } + + assertFalse(initializingFuture.get().isCancelled()); + assertTrue(initializingFuture.get().isDone()); + assertNull(initializingFuture.get().get()); + + assertTrue("timeout waiting for sniffing rounds to be completed", completionLatch.await(1000, TimeUnit.MILLISECONDS)); + assertThat(onFailureTasks.size(), greaterThanOrEqualTo(minNumOnFailureRounds)); + assertEquals(onFailureTasks.size(), afterFailureTasks.size()); + + for (Sniffer.ScheduledTask onFailureTask : onFailureTasks) { + assertFalse(onFailureTask.future.isCancelled()); + assertTrue(onFailureTask.future.isDone()); + assertNull(onFailureTask.future.get()); + assertTrue(onFailureTask.task.hasStarted()); + assertFalse(onFailureTask.task.isSkipped()); + } + + int cancelledTasks = 0; + int completedTasks = onFailureTasks.size() + 1; + for (Sniffer.ScheduledTask afterFailureTask : afterFailureTasks) { + if (assertTaskCancelledOrCompleted(afterFailureTask)) { + completedTasks++; + } else { + cancelledTasks++; + } + } + + assertThat(ordinaryRoundsTasks.size(), greaterThan(0)); + for (Sniffer.ScheduledTask task : ordinaryRoundsTasks) { + if (assertTaskCancelledOrCompleted(task)) { + completedTasks++; + } else { + cancelledTasks++; + } + } + assertEquals(onFailureTasks.size(), cancelledTasks); + + assertEquals(completedTasks, hostsSniffer.runs.get()); + int setHostsRuns = hostsSniffer.runs.get() - hostsSniffer.failures.get() - hostsSniffer.emptyList.get(); + verify(restClient, times(setHostsRuns)).setHosts(Matchers.anyVararg()); + verifyNoMoreInteractions(restClient); + } finally { + executor.shutdown(); + executor.awaitTermination(1000L, TimeUnit.MILLISECONDS); + } + } + + private static boolean assertTaskCancelledOrCompleted(Sniffer.ScheduledTask task) throws ExecutionException, InterruptedException { + if (task.task.isSkipped()) { + assertTrue(task.future.isCancelled()); + try { + task.future.get(); + fail("cancellation exception should have been thrown"); + } catch(CancellationException ignore) { + } + return false; + } else { + try { + assertNull(task.future.get()); + } catch(CancellationException ignore) { + assertTrue(task.future.isCancelled()); + } + assertTrue(task.future.isDone()); + assertTrue(task.task.hasStarted()); + return true; + } + } + + public void testTaskCancelling() throws Exception { + RestClient restClient = mock(RestClient.class); + HostsSniffer hostsSniffer = mock(HostsSniffer.class); + Scheduler noOpScheduler = new Scheduler() { + @Override + public Future schedule(Sniffer.Task task, long delayMillis) { + return null; + } + + @Override + public void shutdown() { + } + }; + Sniffer sniffer = new Sniffer(restClient, hostsSniffer, noOpScheduler, 0L, 0L); + ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + try { + int numIters = randomIntBetween(50, 100); + for (int i = 0; i < numIters; i++) { + Sniffer.Task task = sniffer.new Task(0L); + TaskWrapper wrapper = new TaskWrapper(task); + Future future; + if (rarely()) { + future = executor.schedule(wrapper, randomLongBetween(0L, 200L), TimeUnit.MILLISECONDS); + } else { + future = executor.submit(wrapper); + } + Sniffer.ScheduledTask scheduledTask = new Sniffer.ScheduledTask(task, future); + boolean skip = scheduledTask.skip(); + try { + assertNull(future.get()); + } catch(CancellationException ignore) { + assertTrue(future.isCancelled()); + } + + if (skip) { + //the task was either cancelled before starting, in which case it will never start (thanks to Future#cancel), + //or skipped, in which case it will run but do nothing (thanks to Task#skip). + //Here we want to make sure that whenever skip returns true, the task either won't run or it won't do anything, + //otherwise we may end up with parallel sniffing tracks given that each task schedules the following one. We need to + // make sure that onFailure takes scheduling over while at the same time ordinary rounds don't go on. + assertFalse(task.hasStarted()); + assertTrue(task.isSkipped()); + assertTrue(future.isCancelled()); + assertTrue(future.isDone()); + } else { + //if a future is cancelled when its execution has already started, future#get throws CancellationException before + //completion. The execution continues though so we use a latch to try and wait for the task to be completed. + //Here we want to make sure that whenever skip returns false, the task will be completed, otherwise we may be + //missing to schedule the following round, which means no sniffing will ever happen again besides on failure sniffing. + assertTrue(wrapper.await()); + //the future may or may not be cancelled but the task has for sure started and completed + assertTrue(task.toString(), task.hasStarted()); + assertFalse(task.isSkipped()); + assertTrue(future.isDone()); + } + //subsequent cancel calls return false for sure + int cancelCalls = randomIntBetween(1, 10); + for (int j = 0; j < cancelCalls; j++) { + assertFalse(scheduledTask.skip()); + } + } + } finally { + executor.shutdown(); + executor.awaitTermination(1000, TimeUnit.MILLISECONDS); + } + } + + /** + * Wraps a {@link Sniffer.Task} and allows to wait for its completion. This is needed to verify + * that tasks are either never started or always completed. Calling {@link Future#get()} against a cancelled future will + * throw {@link CancellationException} straight-away but the execution of the task will continue if it had already started, + * in which case {@link Future#cancel(boolean)} returns true which is not very helpful. + */ + private static final class TaskWrapper implements Runnable { + final Sniffer.Task task; + final CountDownLatch completionLatch = new CountDownLatch(1); + + TaskWrapper(Sniffer.Task task) { + this.task = task; + } + + @Override + public void run() { + try { + task.run(); + } finally { + completionLatch.countDown(); + } + } + + boolean await() throws InterruptedException { + return completionLatch.await(1000, TimeUnit.MILLISECONDS); + } + } + + /** + * Mock {@link HostsSniffer} implementation used for testing, which most of the times return a fixed host. + * It rarely throws exception or return an empty list of hosts, to make sure that such situations are properly handled. + * It also asserts that it never gets called concurrently, based on the assumption that only one sniff run can be run + * at a given point in time. + */ + private static class CountingHostsSniffer implements HostsSniffer { + private final AtomicInteger runs = new AtomicInteger(0); + private final AtomicInteger failures = new AtomicInteger(0); + private final AtomicInteger emptyList = new AtomicInteger(0); + + @Override + public List sniffHosts() throws IOException { + int run = runs.incrementAndGet(); + if (rarely()) { + failures.incrementAndGet(); + //check that if communication breaks, sniffer keeps on working + throw new IOException("communication breakdown"); + } + if (rarely()) { + emptyList.incrementAndGet(); + return Collections.emptyList(); + } + return buildHosts(run); + } + + private static List buildHosts(int run) { + int size = run % 5 + 1; + assert size > 0; + List hosts = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + hosts.add(new HttpHost("sniffed-" + run, 9200 + i)); + } + return hosts; + } + } + + @SuppressWarnings("unchecked") + public void testDefaultSchedulerSchedule() { + RestClient restClient = mock(RestClient.class); + HostsSniffer hostsSniffer = mock(HostsSniffer.class); + Scheduler noOpScheduler = new Scheduler() { + @Override + public Future schedule(Sniffer.Task task, long delayMillis) { + return mock(Future.class); + } + + @Override + public void shutdown() { + + } + }; + Sniffer sniffer = new Sniffer(restClient, hostsSniffer, noOpScheduler, 0L, 0L); + Sniffer.Task task = sniffer.new Task(randomLongBetween(1, Long.MAX_VALUE)); + + ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class); + final ScheduledFuture mockedFuture = mock(ScheduledFuture.class); + when(scheduledExecutorService.schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class))) + .then(new Answer>() { + @Override + public ScheduledFuture answer(InvocationOnMock invocationOnMock) { + return mockedFuture; + } + }); + DefaultScheduler scheduler = new DefaultScheduler(scheduledExecutorService); + long delay = randomLongBetween(1, Long.MAX_VALUE); + Future future = scheduler.schedule(task, delay); + assertSame(mockedFuture, future); + verify(scheduledExecutorService).schedule(task, delay, TimeUnit.MILLISECONDS); + verifyNoMoreInteractions(scheduledExecutorService, mockedFuture); + } + + public void testDefaultSchedulerThreadFactory() { + DefaultScheduler defaultScheduler = new DefaultScheduler(); + try { + ScheduledExecutorService executorService = defaultScheduler.executor; + assertThat(executorService, instanceOf(ScheduledThreadPoolExecutor.class)); + assertThat(executorService, instanceOf(ScheduledThreadPoolExecutor.class)); + ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) executorService; + assertTrue(executor.getRemoveOnCancelPolicy()); + assertFalse(executor.getContinueExistingPeriodicTasksAfterShutdownPolicy()); + assertTrue(executor.getExecuteExistingDelayedTasksAfterShutdownPolicy()); + assertThat(executor.getThreadFactory(), instanceOf(Sniffer.SnifferThreadFactory.class)); + int iters = randomIntBetween(3, 10); + for (int i = 1; i <= iters; i++) { + Thread thread = executor.getThreadFactory().newThread(new Runnable() { + @Override + public void run() { + + } + }); + assertThat(thread.getName(), equalTo("es_rest_client_sniffer[T#" + i + "]")); + assertThat(thread.isDaemon(), is(true)); + } + } finally { + defaultScheduler.shutdown(); + } + } + + public void testDefaultSchedulerShutdown() throws Exception { + ScheduledThreadPoolExecutor executor = mock(ScheduledThreadPoolExecutor.class); + DefaultScheduler defaultScheduler = new DefaultScheduler(executor); + defaultScheduler.shutdown(); + verify(executor).shutdown(); + verify(executor).awaitTermination(1000, TimeUnit.MILLISECONDS); + verify(executor).shutdownNow(); + verifyNoMoreInteractions(executor); + + when(executor.awaitTermination(1000, TimeUnit.MILLISECONDS)).thenReturn(true); + defaultScheduler.shutdown(); + verify(executor, times(2)).shutdown(); + verify(executor, times(2)).awaitTermination(1000, TimeUnit.MILLISECONDS); + verifyNoMoreInteractions(executor); + } +} diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporter.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporter.java index 88b1a8a9db18f..fe00e8ce83ec6 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporter.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/HttpExporter.java @@ -41,8 +41,6 @@ import org.joda.time.format.DateTimeFormatter; import javax.net.ssl.SSLContext; - -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -658,12 +656,12 @@ public void doClose() { if (sniffer != null) { sniffer.close(); } - } catch (IOException | RuntimeException e) { + } catch (Exception e) { logger.error("an error occurred while closing the internal client sniffer", e); } finally { try { client.close(); - } catch (IOException | RuntimeException e) { + } catch (Exception e) { logger.error("an error occurred while closing the internal client", e); } } diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/NodeFailureListener.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/NodeFailureListener.java index 6590232fda1ff..92febdf3561f8 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/NodeFailureListener.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/http/NodeFailureListener.java @@ -86,7 +86,7 @@ public void onFailure(final HttpHost host) { resource.markDirty(); } if (sniffer != null) { - sniffer.sniffOnFailure(host); + sniffer.sniffOnFailure(); } } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/NodeFailureListenerTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/NodeFailureListenerTests.java index f1ecb799406e8..08512e82e145d 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/NodeFailureListenerTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/http/NodeFailureListenerTests.java @@ -46,7 +46,7 @@ public void testSnifferNotifiedOnFailure() { listener.onFailure(host); - verify(sniffer).sniffOnFailure(host); + verify(sniffer).sniffOnFailure(); } public void testResourceNotifiedOnFailure() { @@ -71,7 +71,7 @@ public void testResourceAndSnifferNotifiedOnFailure() { } if (optionalSniffer != null) { - verify(sniffer).sniffOnFailure(host); + verify(sniffer).sniffOnFailure(); } }