Skip to content

Commit

Permalink
Fixing concurrent search tests with one slice (#11071)
Browse files Browse the repository at this point in the history
Signed-off-by: Ticheng Lin <[email protected]>
  • Loading branch information
ticheng-aws committed Nov 2, 2023
1 parent 8673fa9 commit 3ce73bf
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,17 @@ protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build();
}

public void testSimpleMultiSearch() {
public void testSimpleMultiSearch() throws Exception {
assumeFalse(
"Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/11064",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
createIndex("test");
ensureGreen();
client().prepareIndex("test").setId("1").setSource("field", "xxx").get();
client().prepareIndex("test").setId("2").setSource("field", "yyy").get();
refresh();
indexRandomForConcurrentSearch("test");
MultiSearchResponse response = client().prepareMultiSearch()
.add(client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "xxx")))
.add(client().prepareSearch("test").setQuery(QueryBuilders.termQuery("field", "yyy")))
Expand All @@ -94,13 +99,18 @@ public void testSimpleMultiSearch() {
assertFirstHit(response.getResponses()[1].getResponse(), hasId("2"));
}

public void testSimpleMultiSearchMoreRequests() {
public void testSimpleMultiSearchMoreRequests() throws Exception {
assumeFalse(
"Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/11064",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
createIndex("test");
int numDocs = randomIntBetween(0, 16);
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test").setId(Integer.toString(i)).setSource("{}", MediaTypeRegistry.JSON).get();
}
refresh();
indexRandomForConcurrentSearch("test");

int numSearchRequests = randomIntBetween(1, 64);
MultiSearchRequest request = new MultiSearchRequest();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ protected Settings featureFlagSettings() {
}

public void testSimpleNested() throws Exception {
assumeFalse(
"Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/11065",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
assertAcked(prepareCreate("test").setMapping("nested1", "type=nested"));
ensureGreen();

Expand Down Expand Up @@ -132,6 +136,7 @@ public void testSimpleNested() throws Exception {
refresh();
// check the numDocs
assertDocumentCount("test", 3);
indexRandomForConcurrentSearch("test");

searchResponse = client().prepareSearch("test").setQuery(termQuery("n_field1", "n_value1_1")).get();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(0L));
Expand Down Expand Up @@ -293,6 +298,7 @@ public void testMultiNested() throws Exception {
refresh();
// check the numDocs
assertDocumentCount("test", 7);
indexRandomForConcurrentSearch("test");

// do some multi nested queries
SearchResponse searchResponse = client().prepareSearch("test")
Expand Down Expand Up @@ -485,6 +491,7 @@ public void testExplain() throws Exception {
)
.setRefreshPolicy(IMMEDIATE)
.get();
indexRandomForConcurrentSearch("test");

SearchResponse searchResponse = client().prepareSearch("test")
.setQuery(nestedQuery("nested1", termQuery("nested1.n_field1", "n_value1"), ScoreMode.Total))
Expand All @@ -498,6 +505,10 @@ public void testExplain() throws Exception {
}

public void testSimpleNestedSorting() throws Exception {
assumeFalse(
"Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/11065",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
assertAcked(
prepareCreate("test").setSettings(Settings.builder().put(indexSettings()).put("index.refresh_interval", -1))
.setMapping(
Expand Down Expand Up @@ -567,6 +578,7 @@ public void testSimpleNestedSorting() throws Exception {
)
.get();
refresh();
indexRandomForConcurrentSearch("test");

SearchResponse searchResponse = client().prepareSearch("test")
.setQuery(QueryBuilders.matchAllQuery())
Expand Down Expand Up @@ -596,6 +608,10 @@ public void testSimpleNestedSorting() throws Exception {
}

public void testSimpleNestedSortingWithNestedFilterMissing() throws Exception {
assumeFalse(
"Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/11065",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
assertAcked(
prepareCreate("test").setSettings(Settings.builder().put(indexSettings()).put("index.refresh_interval", -1))
.setMapping(
Expand Down Expand Up @@ -656,6 +672,7 @@ public void testSimpleNestedSortingWithNestedFilterMissing() throws Exception {
.get();
// Doc with missing nested docs if nested filter is used
refresh();
indexRandomForConcurrentSearch("test");
client().prepareIndex("test")
.setId("3")
.setSource(
Expand All @@ -675,6 +692,7 @@ public void testSimpleNestedSortingWithNestedFilterMissing() throws Exception {
)
.get();
refresh();
indexRandomForConcurrentSearch("test");

SearchRequestBuilder searchRequestBuilder = client().prepareSearch("test")
.setQuery(QueryBuilders.matchAllQuery())
Expand Down Expand Up @@ -727,6 +745,10 @@ public void testSimpleNestedSortingWithNestedFilterMissing() throws Exception {
}

public void testNestedSortWithMultiLevelFiltering() throws Exception {
assumeFalse(
"Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/11065",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
assertAcked(
prepareCreate("test").setMapping(
"{\n"
Expand Down Expand Up @@ -863,6 +885,7 @@ public void testNestedSortWithMultiLevelFiltering() throws Exception {
)
.get();
refresh();
indexRandomForConcurrentSearch("test");

// access id = 1, read, max value, asc, should use grault and quxx
SearchResponse searchResponse = client().prepareSearch()
Expand Down Expand Up @@ -968,6 +991,10 @@ public void testNestedSortWithMultiLevelFiltering() throws Exception {

// https://github.com/elastic/elasticsearch/issues/31554
public void testLeakingSortValues() throws Exception {
assumeFalse(
"Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/11065",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
assertAcked(
prepareCreate("test").setSettings(Settings.builder().put("number_of_shards", 1))
.setMapping(
Expand Down Expand Up @@ -1035,6 +1062,7 @@ public void testLeakingSortValues() throws Exception {
.get();

refresh();
indexRandomForConcurrentSearch("test");

SearchResponse searchResponse = client().prepareSearch()
.setQuery(termQuery("_id", 2))
Expand All @@ -1056,6 +1084,10 @@ public void testLeakingSortValues() throws Exception {
}

public void testSortNestedWithNestedFilter() throws Exception {
assumeFalse(
"Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/11065",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
assertAcked(
prepareCreate("test").setMapping(
XContentFactory.jsonBuilder()
Expand Down Expand Up @@ -1216,6 +1248,7 @@ public void testSortNestedWithNestedFilter() throws Exception {
.get();
refresh();

indexRandomForConcurrentSearch("test");
// Without nested filter
SearchResponse searchResponse = client().prepareSearch()
.setQuery(matchAllQuery())
Expand Down Expand Up @@ -1453,6 +1486,10 @@ public void testSortNestedWithNestedFilter() throws Exception {

// Issue #9305
public void testNestedSortingWithNestedFilterAsFilter() throws Exception {
assumeFalse(
"Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/11065",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
assertAcked(
prepareCreate("test").setMapping(
jsonBuilder().startObject()
Expand Down Expand Up @@ -1595,6 +1632,7 @@ public void testNestedSortingWithNestedFilterAsFilter() throws Exception {
.get();
assertTrue(indexResponse2.getShardInfo().getSuccessful() > 0);
refresh();
indexRandomForConcurrentSearch("test");

SearchResponse searchResponse = client().prepareSearch("test")
.addSort(SortBuilders.fieldSort("users.first").setNestedPath("users").order(SortOrder.ASC))
Expand Down Expand Up @@ -1627,6 +1665,7 @@ public void testCheckFixedBitSetCache() throws Exception {
client().prepareIndex("test").setId("1").setSource("field", "value").get();
refresh();
ensureSearchable("test");
indexRandomForConcurrentSearch("test");

// No nested mapping yet, there shouldn't be anything in the fixed bit set cache
ClusterStatsResponse clusterStatsResponse = client().admin().cluster().prepareClusterStats().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public void clearIndex() {
public void testPit() throws Exception {
CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true);
request.setIndices(new String[] { "index" });
indexRandomForConcurrentSearch("index");
ActionFuture<CreatePitResponse> execute = client().execute(CreatePitAction.INSTANCE, request);
CreatePitResponse pitResponse = execute.get();
SearchResponse searchResponse = client().prepareSearch("index")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.ParameterizedOpenSearchIntegTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -99,7 +98,7 @@ public Settings nodeSettings(int nodeOrdinal) {
}

// see #2896
public void testStopOneNodePreferenceWithRedState() throws IOException {
public void testStopOneNodePreferenceWithRedState() throws Exception {
assertAcked(
prepareCreate("test").setSettings(
Settings.builder().put("index.number_of_shards", cluster().numDataNodes() + 2).put("index.number_of_replicas", 0)
Expand All @@ -110,6 +109,7 @@ public void testStopOneNodePreferenceWithRedState() throws IOException {
client().prepareIndex("test").setId("" + i).setSource("field1", "value1").get();
}
refresh();
indexRandomForConcurrentSearch("test");
internalCluster().stopRandomDataNode();
client().admin().cluster().prepareHealth().setWaitForStatus(ClusterHealthStatus.RED).get();
String[] preferences = new String[] {
Expand Down Expand Up @@ -138,7 +138,7 @@ public void testStopOneNodePreferenceWithRedState() throws IOException {
assertThat("_only_local", searchResponse.getFailedShards(), greaterThanOrEqualTo(0));
}

public void testNoPreferenceRandom() {
public void testNoPreferenceRandom() throws Exception {
assertAcked(
prepareCreate("test").setSettings(
// this test needs at least a replica to make sure two consecutive searches go to two different copies of the same data
Expand All @@ -149,6 +149,7 @@ public void testNoPreferenceRandom() {

client().prepareIndex("test").setSource("field1", "value1").get();
refresh();
indexRandomForConcurrentSearch("test");

final Client client = internalCluster().smartClient();
SearchResponse searchResponse = client.prepareSearch("test").setQuery(matchAllQuery()).get();
Expand All @@ -159,12 +160,17 @@ public void testNoPreferenceRandom() {
assertThat(firstNodeId, not(equalTo(secondNodeId)));
}

public void testSimplePreference() {
public void testSimplePreference() throws Exception {
assumeFalse(
"Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/11066",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
client().admin().indices().prepareCreate("test").setSettings("{\"number_of_replicas\": 1}", MediaTypeRegistry.JSON).get();
ensureGreen();

client().prepareIndex("test").setSource("field1", "value1").get();
refresh();
indexRandomForConcurrentSearch("test");

SearchResponse searchResponse = client().prepareSearch().setQuery(matchAllQuery()).get();
assertThat(searchResponse.getHits().getTotalHits().value, equalTo(1L));
Expand Down Expand Up @@ -201,7 +207,7 @@ public void testThatSpecifyingNonExistingNodesReturnsUsefulError() {
}
}

public void testNodesOnlyRandom() {
public void testNodesOnlyRandom() throws Exception {
assertAcked(
prepareCreate("test").setSettings(
// this test needs at least a replica to make sure two consecutive searches go to two different copies of the same data
Expand All @@ -211,6 +217,7 @@ public void testNodesOnlyRandom() {
ensureGreen();
client().prepareIndex("test").setSource("field1", "value1").get();
refresh();
indexRandomForConcurrentSearch("test");

final Client client = internalCluster().smartClient();
// multiple wildchar to cover multi-param usecase
Expand Down Expand Up @@ -262,14 +269,17 @@ private void assertSearchOnRandomNodes(SearchRequestBuilder request) {
assertThat(hitNodes.size(), greaterThan(1));
}

public void testCustomPreferenceUnaffectedByOtherShardMovements() {
public void testCustomPreferenceUnaffectedByOtherShardMovements() throws Exception {

/*
* Custom preferences can be used to encourage searches to go to a consistent set of shard copies, meaning that other copies' data
* is rarely touched and can be dropped from the filesystem cache. This works best if the set of shards searched doesn't change
* unnecessarily, so this test verifies a consistent routing even as other shards are created/relocated/removed.
*/

assumeFalse(
"Concurrent search case muted pending fix: https://github.com/opensearch-project/OpenSearch/issues/11066",
internalCluster().clusterService().getClusterSettings().get(CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING)
);
assertAcked(
prepareCreate("test").setSettings(
Settings.builder()
Expand All @@ -281,6 +291,7 @@ public void testCustomPreferenceUnaffectedByOtherShardMovements() {
ensureGreen();
client().prepareIndex("test").setSource("field1", "value1").get();
refresh();
indexRandomForConcurrentSearch("test");

final String customPreference = randomAlphaOfLength(10);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ public void testDocWithAllTypes() throws Exception {
String docBody = copyToStringFromClasspath("/org/opensearch/search/query/all-example-document.json");
reqs.add(client().prepareIndex("test").setId("1").setSource(docBody, MediaTypeRegistry.JSON));
indexRandom(true, false, reqs);
indexRandomForConcurrentSearch("test");

SearchResponse resp = client().prepareSearch("test").setQuery(queryStringQuery("foo")).get();
assertHits(resp.getHits(), "1");
Expand Down Expand Up @@ -225,6 +226,7 @@ public void testKeywordWithWhitespace() throws Exception {
reqs.add(client().prepareIndex("test").setId("2").setSource("f1", "bar"));
reqs.add(client().prepareIndex("test").setId("3").setSource("f1", "foo bar"));
indexRandom(true, false, reqs);
indexRandomForConcurrentSearch("test");

SearchResponse resp = client().prepareSearch("test").setQuery(queryStringQuery("foo")).get();
assertHits(resp.getHits(), "3");
Expand All @@ -245,6 +247,7 @@ public void testRegexCaseInsensitivity() throws Exception {
indexRequests.add(client().prepareIndex("messages").setId("1").setSource("message", "message: this is a TLS handshake"));
indexRequests.add(client().prepareIndex("messages").setId("2").setSource("message", "message: this is a tcp handshake"));
indexRandom(true, false, indexRequests);
indexRandomForConcurrentSearch("messages");

SearchResponse response = client().prepareSearch("messages").setQuery(queryStringQuery("/TLS/").defaultField("message")).get();
assertNoFailures(response);
Expand Down Expand Up @@ -282,6 +285,7 @@ public void testAllFields() throws Exception {
List<IndexRequestBuilder> reqs = new ArrayList<>();
reqs.add(client().prepareIndex("test_1").setId("1").setSource("f1", "foo", "f2", "eggplant"));
indexRandom(true, false, reqs);
indexRandomForConcurrentSearch("test_1");

SearchResponse resp = client().prepareSearch("test_1")
.setQuery(queryStringQuery("foo eggplant").defaultOperator(Operator.AND))
Expand Down Expand Up @@ -374,6 +378,7 @@ public void testLimitOnExpandedFields() throws Exception {

client().prepareIndex("testindex").setId("1").setSource("field_A0", "foo bar baz").get();
refresh();
indexRandomForConcurrentSearch("testindex");

// single field shouldn't trigger the limit
doAssertOneHitForQueryString("field_A0:foo");
Expand Down Expand Up @@ -465,6 +470,7 @@ public void testFieldAliasOnDisallowedFieldType() throws Exception {
List<IndexRequestBuilder> indexRequests = new ArrayList<>();
indexRequests.add(client().prepareIndex("test").setId("1").setSource("f3", "text", "f2", "one"));
indexRandom(true, false, indexRequests);
indexRandomForConcurrentSearch("test");

// The wildcard field matches aliases for both a text and geo_point field.
// By default, the geo_point field should be ignored when building the query.
Expand Down
Loading

0 comments on commit 3ce73bf

Please sign in to comment.