Skip to content

Commit

Permalink
Fix record skipping when querying paginated data across shards (#3061)
Browse files Browse the repository at this point in the history
* Add reproducer for pagination skipping bug

Signed-off-by: Simeon Widdis <[email protected]>

* Fix the bug

Signed-off-by: Simeon Widdis <[email protected]>

* Add additional sorts to other locations

Signed-off-by: Simeon Widdis <[email protected]>

* Use constant for ID field

Signed-off-by: Simeon Widdis <[email protected]>

* Fix broken cursor test

Signed-off-by: Simeon Widdis <[email protected]>

* Apply spotless

Signed-off-by: Simeon Widdis <[email protected]>

---------

Signed-off-by: Simeon Widdis <[email protected]>
  • Loading branch information
Swiddis authored Oct 13, 2024
1 parent 4c44f56 commit e838e46
Show file tree
Hide file tree
Showing 9 changed files with 166 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,15 @@ public enum Index {
"calcs",
getMappingFile("calcs_index_mappings.json"),
"src/test/resources/calcs.json"),
// Calcs has enough records for shards to be interesting, but updating the existing mapping with
// shards in-place
// breaks existing tests. Aside from introducing a primary shard setting > 1, this index is
// identical to CALCS.
CALCS_WITH_SHARDS(
TestsConstants.TEST_INDEX_CALCS,
"calcs",
getMappingFile("calcs_with_shards_index_mappings.json"),
"src/test/resources/calcs.json"),
DATE_FORMATS(
TestsConstants.TEST_INDEX_DATE_FORMATS,
"date_formats",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@

package org.opensearch.sql.sql;

import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_PHRASE;
import static org.opensearch.sql.legacy.TestsConstants.*;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.json.JSONObject;
import org.junit.After;
import org.junit.Test;
Expand All @@ -18,6 +20,7 @@ public class PaginationWindowIT extends SQLIntegTestCase {
@Override
public void init() throws IOException {
loadIndex(Index.PHRASE);
loadIndex(Index.CALCS_WITH_SHARDS);
}

@After
Expand Down Expand Up @@ -92,4 +95,41 @@ public void testFetchSizeLargerThanResultWindowFails() throws IOException {
() -> executeQueryTemplate("SELECT * FROM %s", TEST_INDEX_PHRASE, window + 1));
resetMaxResultWindow(TEST_INDEX_PHRASE);
}

@Test
public void testMultiShardPagesEqualsActualData() throws IOException {
// A bug made it so when pulling unordered data from an index with multiple shards, data gets
// lost if the fetchSize
// is not a multiple of the shard count. This tests that, for an index with 4 shards, pulling
// one page of 10 records
// is equivalent to pulling two pages of 5 records.

var query = "SELECT key from " + TEST_INDEX_CALCS;

var expectedResponse = new JSONObject(executeFetchQuery(query, 10, "jdbc"));
var expectedRows = expectedResponse.getJSONArray("datarows");

List<String> expectedKeys = new ArrayList<>();
for (int i = 0; i < expectedRows.length(); i++) {
expectedKeys.add(expectedRows.getJSONArray(i).getString(0));
}

var actualPage1 = new JSONObject(executeFetchQuery(query, 5, "jdbc"));

var actualRows1 = actualPage1.getJSONArray("datarows");
var cursor = actualPage1.getString("cursor");
var actualPage2 = executeCursorQuery(cursor);

var actualRows2 = actualPage2.getJSONArray("datarows");

List<String> actualKeys = new ArrayList<>();
for (int i = 0; i < actualRows1.length(); i++) {
actualKeys.add(actualRows1.getJSONArray(i).getString(0));
}
for (int i = 0; i < actualRows2.length(); i++) {
actualKeys.add(actualRows2.getJSONArray(i).getString(0));
}

assertEquals(expectedKeys, actualKeys);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
{
"mappings" : {
"properties" : {
"key" : {
"type" : "keyword"
},
"num0" : {
"type" : "double"
},
"num1" : {
"type" : "double"
},
"num2" : {
"type" : "double"
},
"num3" : {
"type" : "double"
},
"num4" : {
"type" : "double"
},
"str0" : {
"type" : "keyword"
},
"str1" : {
"type" : "keyword"
},
"str2" : {
"type" : "keyword"
},
"str3" : {
"type" : "keyword"
},
"int0" : {
"type" : "integer"
},
"int1" : {
"type" : "integer"
},
"int2" : {
"type" : "integer"
},
"int3" : {
"type" : "integer"
},
"bool0" : {
"type" : "boolean"
},
"bool1" : {
"type" : "boolean"
},
"bool2" : {
"type" : "boolean"
},
"bool3" : {
"type" : "boolean"
},
"date0" : {
"type" : "date",
"format": "year_month_day"
},
"date1" : {
"type" : "date",
"format": "year_month_day"
},
"date2" : {
"type" : "date",
"format": "year_month_day"
},
"date3" : {
"type" : "date",
"format": "year_month_day"
},
"time0" : {
"type" : "date",
"format": "date_time_no_millis"
},
"time1" : {
"type" : "date",
"format": "hour_minute_second"
},
"datetime0" : {
"type" : "date",
"format": "date_time_no_millis"
},
"datetime1" : {
"type" : "date"
},
"zzz" : {
"type" : "keyword"
}
}
},
"settings": {
"index": {
"number_of_shards": 4
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.opensearch.search.sort.SortOrder.ASC;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;
import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATA_FIELD_ID;

import java.io.IOException;
import org.apache.logging.log4j.LogManager;
Expand All @@ -19,6 +20,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.SearchHits;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.sql.legacy.domain.Select;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.exception.SqlParseException;
Expand Down Expand Up @@ -70,6 +72,7 @@ public SearchResponse getResponseWithHits(
boolean ordered = select.isOrderdSelect();
if (!ordered) {
request.addSort(DOC_FIELD_NAME, ASC);
request.addSort(METADATA_FIELD_ID, SortOrder.ASC);
}
// Set PIT
request.setPointInTime(new PointInTimeBuilder(pit.getPitId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.legacy.executor.join;

import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;
import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATA_FIELD_ID;

import com.google.common.collect.ImmutableMap;
import java.io.IOException;
Expand Down Expand Up @@ -39,6 +40,7 @@ public static SearchResponse scrollOneTimeWithHits(
boolean ordered = originalSelect.isOrderdSelect();
if (!ordered) {
scrollRequest.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC);
scrollRequest.addSort(METADATA_FIELD_ID, SortOrder.ASC);
}
SearchResponse responseWithHits = scrollRequest.get();
// on ordered select - not using SCAN , elastic returns hits on first scroll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.legacy.query;

import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;
import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATA_FIELD_ID;

import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr;
Expand Down Expand Up @@ -110,6 +111,7 @@ public void checkAndSetScroll() {
boolean ordered = select.isOrderdSelect();
if (!ordered) {
request.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC);
request.addSort(METADATA_FIELD_ID, SortOrder.ASC);
}
// Request also requires PointInTime, but we should create pit while execution.
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.opensearch.sql.legacy.query.planner.physical.node.pointInTime;

import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATA_FIELD_ID;

import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.sort.FieldSortBuilder;
Expand Down Expand Up @@ -42,6 +44,7 @@ protected void loadFirstBatch() {
request
.getRequestBuilder()
.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)
.addSort(METADATA_FIELD_ID, SortOrder.ASC)
.setSize(pageSize)
.setTimeout(TimeValue.timeValueSeconds(timeout))
.setPointInTime(new PointInTimeBuilder(pitId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.sql.legacy.query.planner.physical.node.scroll;

import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATA_FIELD_ID;

import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.sort.FieldSortBuilder;
Expand Down Expand Up @@ -40,6 +42,7 @@ protected void loadFirstBatch() {
request
.getRequestBuilder()
.addSort(FieldSortBuilder.DOC_FIELD_NAME, SortOrder.ASC)
.addSort(METADATA_FIELD_ID, SortOrder.ASC)
.setSize(pageSize)
.setScroll(TimeValue.timeValueSeconds(timeout))
.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.opensearch.core.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS;
import static org.opensearch.search.sort.FieldSortBuilder.DOC_FIELD_NAME;
import static org.opensearch.search.sort.SortOrder.ASC;
import static org.opensearch.sql.opensearch.storage.OpenSearchIndex.METADATA_FIELD_ID;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -189,6 +190,9 @@ public OpenSearchResponse searchWithPIT(Function<SearchRequest, SearchResponse>
// Set sort field for search_after
if (this.sourceBuilder.sorts() == null) {
this.sourceBuilder.sort(DOC_FIELD_NAME, ASC);
// Workaround to preserve sort location more exactly,
// see https://github.com/opensearch-project/sql/pull/3061
this.sourceBuilder.sort(METADATA_FIELD_ID, ASC);
}
SearchRequest searchRequest = new SearchRequest().source(this.sourceBuilder);
this.searchResponse = searchAction.apply(searchRequest);
Expand Down

0 comments on commit e838e46

Please sign in to comment.