Skip to content

Commit

Permalink
Add test that get triggers shard search active (#46317)
Browse files Browse the repository at this point in the history
This commit is a follow-up to a change that fixed that multi-get was not
triggering a shard to become search active. In that change, we added a
test that multi-get properly triggers a shard to become search
active. This commit is a follow-up to that change which adds a test for
the get case. While get is already handled correctly in production code,
there was not a test for it. This commit adds one. Additionally, we
factor all the search idle tests from IndexShardIT into a separate test
class, as an effort to keep related tests together instead of a single
large test class containing a jumble of tests, and also to keep test
classes smaller for better parallelization.
  • Loading branch information
jasontedor authored Sep 4, 2019
1 parent bf0b3ce commit ba90ad9
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 136 deletions.
136 changes: 0 additions & 136 deletions server/src/test/java/org/elasticsearch/index/shard/IndexShardIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@
import org.elasticsearch.action.admin.cluster.node.stats.NodeStats;
import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.elasticsearch.action.admin.indices.stats.IndexStats;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.IndicesOptions;
Expand Down Expand Up @@ -84,7 +82,6 @@
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.test.IndexSettingsModule;
import org.elasticsearch.test.InternalSettingsPlugin;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Assert;

import java.io.IOException;
Expand All @@ -101,15 +98,11 @@
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.IntToLongFunction;
import java.util.function.Predicate;
import java.util.stream.Stream;

Expand All @@ -127,7 +120,6 @@
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoFailures;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.either;
Expand Down Expand Up @@ -689,134 +681,6 @@ private static ShardRouting getInitializingShardRouting(ShardRouting existingSha
return shardRouting;
}

public void testAutomaticRefreshSearch() throws InterruptedException {
runTestAutomaticRefresh(numDocs -> client().prepareSearch("test").get().getHits().getTotalHits().value);
}

public void testAutomaticRefreshMultiGet() throws InterruptedException {
runTestAutomaticRefresh(
numDocs -> {
final MultiGetRequest request = new MultiGetRequest();
request.realtime(false);
for (int i = 0; i < numDocs; i++) {
request.add("test", "" + i);
}
return Arrays.stream(client().multiGet(request).actionGet().getResponses()).filter(r -> r.getResponse().isExists()).count();
});
}

private void runTestAutomaticRefresh(final IntToLongFunction count) throws InterruptedException {
TimeValue randomTimeValue = randomFrom(random(), null, TimeValue.ZERO, TimeValue.timeValueMillis(randomIntBetween(0, 1000)));
Settings.Builder builder = Settings.builder();
if (randomTimeValue != null) {
builder.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), randomTimeValue);
}
IndexService indexService = createIndex("test", builder.build());
assertFalse(indexService.getIndexSettings().isExplicitRefresh());
ensureGreen();
AtomicInteger totalNumDocs = new AtomicInteger(Integer.MAX_VALUE);
assertNoSearchHits(client().prepareSearch().get());
int numDocs = scaledRandomIntBetween(25, 100);
totalNumDocs.set(numDocs);
CountDownLatch indexingDone = new CountDownLatch(numDocs);
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
indexingDone.countDown(); // one doc is indexed above blocking
IndexShard shard = indexService.getShard(0);
boolean hasRefreshed = shard.scheduledRefresh();
if (randomTimeValue == TimeValue.ZERO) {
// with ZERO we are guaranteed to see the doc since we will wait for a refresh in the background
assertFalse(hasRefreshed);
assertTrue(shard.isSearchIdle());
} else {
if (randomTimeValue == null) {
assertFalse(shard.isSearchIdle());
}
// we can't assert on hasRefreshed since it might have been refreshed in the background on the shard concurrently.
// and if the background refresh wins the refresh race (both call maybeRefresh), the document might not be visible
// until the background refresh is done.
if (hasRefreshed == false) {
ensureNoPendingScheduledRefresh(indexService.getThreadPool());
}
}

CountDownLatch started = new CountDownLatch(1);
Thread t = new Thread(() -> {
started.countDown();
do {

} while (count.applyAsLong(totalNumDocs.get()) != totalNumDocs.get());
});
t.start();
started.await();
assertThat(count.applyAsLong(totalNumDocs.get()), equalTo(1L));
for (int i = 1; i < numDocs; i++) {
client().prepareIndex("test", "test", "" + i).setSource("{\"foo\" : \"bar\"}", XContentType.JSON)
.execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
indexingDone.countDown();
}

@Override
public void onFailure(Exception e) {
indexingDone.countDown();
throw new AssertionError(e);
}
});
}
indexingDone.await();
t.join();
}

public void testPendingRefreshWithIntervalChange() throws Exception {
Settings.Builder builder = Settings.builder();
builder.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO);
IndexService indexService = createIndex("test", builder.build());
assertFalse(indexService.getIndexSettings().isExplicitRefresh());
ensureGreen();
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
IndexShard shard = indexService.getShard(0);
assertFalse(shard.scheduledRefresh());
assertTrue(shard.isSearchIdle());
CountDownLatch refreshLatch = new CountDownLatch(1);
client().admin().indices().prepareRefresh()
.execute(ActionListener.wrap(refreshLatch::countDown));// async on purpose to make sure it happens concurrently
assertHitCount(client().prepareSearch().get(), 1);
client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
assertFalse(shard.scheduledRefresh());

// now disable background refresh and make sure the refresh happens
CountDownLatch updateSettingsLatch = new CountDownLatch(1);
client().admin().indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1).build())
.execute(ActionListener.wrap(updateSettingsLatch::countDown));
assertHitCount(client().prepareSearch().get(), 2);
// wait for both to ensure we don't have in-flight operations
updateSettingsLatch.await();
refreshLatch.await();
// We need to ensure a `scheduledRefresh` triggered by the internal refresh setting update is executed before we index a new doc;
// otherwise, it will compete to call `Engine#maybeRefresh` with the `scheduledRefresh` that we are going to verify.
ensureNoPendingScheduledRefresh(indexService.getThreadPool());
client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
assertTrue(shard.scheduledRefresh());
assertTrue(shard.isSearchIdle());
assertHitCount(client().prepareSearch().get(), 3);
}

private void ensureNoPendingScheduledRefresh(ThreadPool threadPool) {
// We can make sure that all scheduled refresh tasks are done by submitting *maximumPoolSize* blocking tasks,
// then wait until all of them completed. Note that using ThreadPoolStats is not watertight as both queue and
// active count can be 0 when ThreadPoolExecutor just takes a task out the queue but before marking it active.
ThreadPoolExecutor refreshThreadPoolExecutor = (ThreadPoolExecutor) threadPool.executor(ThreadPool.Names.REFRESH);
int maximumPoolSize = refreshThreadPoolExecutor.getMaximumPoolSize();
Phaser barrier = new Phaser(maximumPoolSize + 1);
for (int i = 0; i < maximumPoolSize; i++) {
refreshThreadPoolExecutor.execute(barrier::arriveAndAwaitAdvance);
}
barrier.arriveAndAwaitAdvance();
}

public void testGlobalCheckpointListeners() throws Exception {
createIndex("test", Settings.builder()
.put("index.number_of_shards", 1)
Expand Down
193 changes: 193 additions & 0 deletions server/src/test/java/org/elasticsearch/index/shard/SearchIdleIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.shard;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.MultiGetRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.IndexService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.test.ESSingleNodeTestCase;
import org.elasticsearch.threadpool.ThreadPool;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntToLongFunction;

import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoSearchHits;
import static org.hamcrest.Matchers.equalTo;

public class SearchIdleIT extends ESSingleNodeTestCase {

public void testAutomaticRefreshSearch() throws InterruptedException {
runTestAutomaticRefresh(numDocs -> client().prepareSearch("test").get().getHits().getTotalHits().value);
}

public void testAutomaticRefreshGet() throws InterruptedException {
runTestAutomaticRefresh(
numDocs -> {
int count = 0;
for (int i = 0; i < numDocs; i++) {
final GetRequest request = new GetRequest();
request.realtime(false);
request.index("test");
request.id("" + i);
if (client().get(request).actionGet().isExists()) {
count++;
}
}
return count;
});
}

public void testAutomaticRefreshMultiGet() throws InterruptedException {
runTestAutomaticRefresh(
numDocs -> {
final MultiGetRequest request = new MultiGetRequest();
request.realtime(false);
for (int i = 0; i < numDocs; i++) {
request.add("test", "" + i);
}
return Arrays.stream(client().multiGet(request).actionGet().getResponses()).filter(r -> r.getResponse().isExists()).count();
});
}

private void runTestAutomaticRefresh(final IntToLongFunction count) throws InterruptedException {
TimeValue randomTimeValue = randomFrom(random(), null, TimeValue.ZERO, TimeValue.timeValueMillis(randomIntBetween(0, 1000)));
Settings.Builder builder = Settings.builder();
if (randomTimeValue != null) {
builder.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), randomTimeValue);
}
IndexService indexService = createIndex("test", builder.build());
assertFalse(indexService.getIndexSettings().isExplicitRefresh());
ensureGreen();
AtomicInteger totalNumDocs = new AtomicInteger(Integer.MAX_VALUE);
assertNoSearchHits(client().prepareSearch().get());
int numDocs = scaledRandomIntBetween(25, 100);
totalNumDocs.set(numDocs);
CountDownLatch indexingDone = new CountDownLatch(numDocs);
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
indexingDone.countDown(); // one doc is indexed above blocking
IndexShard shard = indexService.getShard(0);
boolean hasRefreshed = shard.scheduledRefresh();
if (randomTimeValue == TimeValue.ZERO) {
// with ZERO we are guaranteed to see the doc since we will wait for a refresh in the background
assertFalse(hasRefreshed);
assertTrue(shard.isSearchIdle());
} else {
if (randomTimeValue == null) {
assertFalse(shard.isSearchIdle());
}
// we can't assert on hasRefreshed since it might have been refreshed in the background on the shard concurrently.
// and if the background refresh wins the refresh race (both call maybeRefresh), the document might not be visible
// until the background refresh is done.
if (hasRefreshed == false) {
ensureNoPendingScheduledRefresh(indexService.getThreadPool());
}
}

CountDownLatch started = new CountDownLatch(1);
Thread t = new Thread(() -> {
started.countDown();
do {

} while (count.applyAsLong(totalNumDocs.get()) != totalNumDocs.get());
});
t.start();
started.await();
assertThat(count.applyAsLong(totalNumDocs.get()), equalTo(1L));
for (int i = 1; i < numDocs; i++) {
client().prepareIndex("test", "test", "" + i).setSource("{\"foo\" : \"bar\"}", XContentType.JSON)
.execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
indexingDone.countDown();
}

@Override
public void onFailure(Exception e) {
indexingDone.countDown();
throw new AssertionError(e);
}
});
}
indexingDone.await();
t.join();
}


public void testPendingRefreshWithIntervalChange() throws Exception {
Settings.Builder builder = Settings.builder();
builder.put(IndexSettings.INDEX_SEARCH_IDLE_AFTER.getKey(), TimeValue.ZERO);
IndexService indexService = createIndex("test", builder.build());
assertFalse(indexService.getIndexSettings().isExplicitRefresh());
ensureGreen();
client().prepareIndex("test", "test", "0").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
IndexShard shard = indexService.getShard(0);
assertFalse(shard.scheduledRefresh());
assertTrue(shard.isSearchIdle());
CountDownLatch refreshLatch = new CountDownLatch(1);
client().admin().indices().prepareRefresh()
.execute(ActionListener.wrap(refreshLatch::countDown));// async on purpose to make sure it happens concurrently
assertHitCount(client().prepareSearch().get(), 1);
client().prepareIndex("test", "test", "1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
assertFalse(shard.scheduledRefresh());

// now disable background refresh and make sure the refresh happens
CountDownLatch updateSettingsLatch = new CountDownLatch(1);
client().admin().indices()
.prepareUpdateSettings("test")
.setSettings(Settings.builder().put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), -1).build())
.execute(ActionListener.wrap(updateSettingsLatch::countDown));
assertHitCount(client().prepareSearch().get(), 2);
// wait for both to ensure we don't have in-flight operations
updateSettingsLatch.await();
refreshLatch.await();
// We need to ensure a `scheduledRefresh` triggered by the internal refresh setting update is executed before we index a new doc;
// otherwise, it will compete to call `Engine#maybeRefresh` with the `scheduledRefresh` that we are going to verify.
ensureNoPendingScheduledRefresh(indexService.getThreadPool());
client().prepareIndex("test", "test", "2").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).get();
assertTrue(shard.scheduledRefresh());
assertTrue(shard.isSearchIdle());
assertHitCount(client().prepareSearch().get(), 3);
}

private void ensureNoPendingScheduledRefresh(ThreadPool threadPool) {
// We can make sure that all scheduled refresh tasks are done by submitting *maximumPoolSize* blocking tasks,
// then wait until all of them completed. Note that using ThreadPoolStats is not watertight as both queue and
// active count can be 0 when ThreadPoolExecutor just takes a task out the queue but before marking it active.
ThreadPoolExecutor refreshThreadPoolExecutor = (ThreadPoolExecutor) threadPool.executor(ThreadPool.Names.REFRESH);
int maximumPoolSize = refreshThreadPoolExecutor.getMaximumPoolSize();
Phaser barrier = new Phaser(maximumPoolSize + 1);
for (int i = 0; i < maximumPoolSize; i++) {
refreshThreadPoolExecutor.execute(barrier::arriveAndAwaitAdvance);
}
barrier.arriveAndAwaitAdvance();
}

}

0 comments on commit ba90ad9

Please sign in to comment.