Skip to content

Commit

Permalink
Add pit for pagination query (#2940)
Browse files Browse the repository at this point in the history
* Add pit for join queries (#2703)

* Add search after for join

Signed-off-by: Rupal Mahajan <[email protected]>

* Enable search after by default

Signed-off-by: Rupal Mahajan <[email protected]>

* Add pit

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix tests

Signed-off-by: Rupal Mahajan <[email protected]>

* ignore joinWithGeoIntersectNL

Signed-off-by: Rupal Mahajan <[email protected]>

* Rerun CI with scroll

Signed-off-by: Rupal Mahajan <[email protected]>

* Remove unused code and retrigger CI with search_after true

Signed-off-by: Rupal Mahajan <[email protected]>

* Address comments

Signed-off-by: Rupal Mahajan <[email protected]>

* Remove unused code change

Signed-off-by: Rupal Mahajan <[email protected]>

* Update pit keep alive time with SQL_CURSOR_KEEP_ALIVE

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix scroll condition

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Add pit before query execution

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Move pit from join request builder to executor

Signed-off-by: Rupal Mahajan <[email protected]>

* Remove unused methods

Signed-off-by: Rupal Mahajan <[email protected]>

* Add pit in parent class's run()

Signed-off-by: Rupal Mahajan <[email protected]>

* Add comment for fetching subsequent result in NestedLoopsElasticExecutor

Signed-off-by: Rupal Mahajan <[email protected]>

* Update comment

Signed-off-by: Rupal Mahajan <[email protected]>

* Add javadoc for pit handler

Signed-off-by: Rupal Mahajan <[email protected]>

* Add pit interface

Signed-off-by: Rupal Mahajan <[email protected]>

* Add pit handler unit test

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix failed unit test CI

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix spotless error

Signed-off-by: Rupal Mahajan <[email protected]>

* Rename pit class and add logs

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix pit delete unit test

Signed-off-by: Rupal Mahajan <[email protected]>

---------

Signed-off-by: Rupal Mahajan <[email protected]>

* Add pit for multi query (#2753)

* Add search after for join

Signed-off-by: Rupal Mahajan <[email protected]>

* Enable search after by default

Signed-off-by: Rupal Mahajan <[email protected]>

* Add pit

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix tests

Signed-off-by: Rupal Mahajan <[email protected]>

* ignore joinWithGeoIntersectNL

Signed-off-by: Rupal Mahajan <[email protected]>

* Rerun CI with scroll

Signed-off-by: Rupal Mahajan <[email protected]>

* draft

Signed-off-by: Rupal Mahajan <[email protected]>

* Remove unused code and retrigger CI with search_after true

Signed-off-by: Rupal Mahajan <[email protected]>

* Address comments

Signed-off-by: Rupal Mahajan <[email protected]>

* Remove unused code change

Signed-off-by: Rupal Mahajan <[email protected]>

* Update pit keep alive time with SQL_CURSOR_KEEP_ALIVE

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix scroll condition

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Add pit before query execution

Signed-off-by: Rupal Mahajan <[email protected]>

* Refactor get response with pit method

Signed-off-by: Rupal Mahajan <[email protected]>

* Update remaining scroll search calls

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix integ test failures

Signed-off-by: Rupal Mahajan <[email protected]>

* nit

Signed-off-by: Rupal Mahajan <[email protected]>

* Move pit from join request builder to executor

Signed-off-by: Rupal Mahajan <[email protected]>

* Remove unused methods

Signed-off-by: Rupal Mahajan <[email protected]>

* Move pit from request to executor

Signed-off-by: Rupal Mahajan <[email protected]>

* Fix pit.delete call missed while merge

Signed-off-by: Rupal Mahajan <[email protected]>

* Move getResponseWithHits method to util class

Signed-off-by: Rupal Mahajan <[email protected]>

* add try catch for create delete pit in minus executor

Signed-off-by: Rupal Mahajan <[email protected]>

* move all common fields to ElasticHitsExecutor

Signed-off-by: Rupal Mahajan <[email protected]>

* add javadoc for ElasticHitsExecutor

Signed-off-by: Rupal Mahajan <[email protected]>

* Add missing javadoc

Signed-off-by: Rupal Mahajan <[email protected]>

* Forcing an empty commit as last commit is stuck processing updates

Signed-off-by: Rupal Mahajan <[email protected]>

---------

Signed-off-by: Rupal Mahajan <[email protected]>

* Add pit to default cursor

Signed-off-by: Rupal Mahajan <[email protected]>

* Run CI without pit unit test

Signed-off-by: Rupal Mahajan <[email protected]>

* Rerun CI without pit unit test

Signed-off-by: Rupal Mahajan <[email protected]>

* FIx unit tests for PIT changes

Signed-off-by: Manasvini B S <[email protected]>

* Addressed comments

Signed-off-by: Manasvini B S <[email protected]>

---------

Signed-off-by: Rupal Mahajan <[email protected]>
Signed-off-by: Manasvini B S <[email protected]>
Co-authored-by: Rupal Mahajan <[email protected]>
  • Loading branch information
manasvinibs and rupal-bq authored Aug 23, 2024
1 parent 7815c96 commit 69853fe
Show file tree
Hide file tree
Showing 12 changed files with 513 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,19 @@

package org.opensearch.sql.legacy.cursor;

import static org.opensearch.core.xcontent.DeprecationHandler.IGNORE_DEPRECATIONS;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -18,6 +29,16 @@
import lombok.Setter;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.search.SearchModule;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.executor.format.Schema;

/**
Expand All @@ -40,6 +61,10 @@ public class DefaultCursor implements Cursor {
private static final String SCROLL_ID = "s";
private static final String SCHEMA_COLUMNS = "c";
private static final String FIELD_ALIAS_MAP = "a";
private static final String PIT_ID = "p";
private static final String SEARCH_REQUEST = "r";
private static final String SORT_FIELDS = "h";
private static final ObjectMapper objectMapper = new ObjectMapper();

/**
* To get mappings for index to check if type is date needed for
Expand Down Expand Up @@ -70,31 +95,85 @@ public class DefaultCursor implements Cursor {
/** To get next batch of result */
private String scrollId;

/** To get Point In Time */
private String pitId;

/** To get next batch of result with search after api */
private SearchSourceBuilder searchSourceBuilder;

/** To get last sort values * */
private Object[] sortFields;

/** To reduce the number of rows left by fetchSize */
@NonNull private Integer fetchSize;

private Integer limit;

/**
* {@link NamedXContentRegistry} from {@link SearchModule} used for construct {@link QueryBuilder}
* from DSL query string.
*/
private static final NamedXContentRegistry xContentRegistry =
new NamedXContentRegistry(
new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents());

@Override
public CursorType getType() {
return type;
}

@Override
public String generateCursorId() {
if (rowsLeft <= 0 || Strings.isNullOrEmpty(scrollId)) {
if (rowsLeft <= 0 || isCursorIdNullOrEmpty()) {
return null;
}
JSONObject json = new JSONObject();
json.put(FETCH_SIZE, fetchSize);
json.put(ROWS_LEFT, rowsLeft);
json.put(INDEX_PATTERN, indexPattern);
json.put(SCROLL_ID, scrollId);
json.put(SCHEMA_COLUMNS, getSchemaAsJson());
json.put(FIELD_ALIAS_MAP, fieldAliasMap);
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
json.put(PIT_ID, pitId);
String sortFieldValue =
AccessController.doPrivileged(
(PrivilegedAction<String>)
() -> {
try {
return objectMapper.writeValueAsString(sortFields);
} catch (JsonProcessingException e) {
throw new RuntimeException(
"Failed to parse sort fields from JSON string.", e);
}
});
json.put(SORT_FIELDS, sortFieldValue);
setSearchRequestString(json, searchSourceBuilder);
} else {
json.put(SCROLL_ID, scrollId);
}
return String.format("%s:%s", type.getId(), encodeCursor(json));
}

private void setSearchRequestString(JSONObject cursorJson, SearchSourceBuilder sourceBuilder) {
try {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
XContentBuilder builder = XContentFactory.jsonBuilder(outputStream);
sourceBuilder.toXContent(builder, null);
builder.close();

String searchRequestBase64 = Base64.getEncoder().encodeToString(outputStream.toByteArray());
cursorJson.put("searchSourceBuilder", searchRequestBase64);
} catch (IOException ex) {
throw new RuntimeException("Failed to set search request string on cursor json.", ex);
}
}

private boolean isCursorIdNullOrEmpty() {
return LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)
? Strings.isNullOrEmpty(pitId)
: Strings.isNullOrEmpty(scrollId);
}

public static DefaultCursor from(String cursorId) {
/**
* It is assumed that cursorId here is the second part of the original cursor passed by the
Expand All @@ -105,13 +184,50 @@ public static DefaultCursor from(String cursorId) {
cursor.setFetchSize(json.getInt(FETCH_SIZE));
cursor.setRowsLeft(json.getLong(ROWS_LEFT));
cursor.setIndexPattern(json.getString(INDEX_PATTERN));
cursor.setScrollId(json.getString(SCROLL_ID));
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
populateCursorForPit(json, cursor);
} else {
cursor.setScrollId(json.getString(SCROLL_ID));
}
cursor.setColumns(getColumnsFromSchema(json.getJSONArray(SCHEMA_COLUMNS)));
cursor.setFieldAliasMap(fieldAliasMap(json.getJSONObject(FIELD_ALIAS_MAP)));

return cursor;
}

private static void populateCursorForPit(JSONObject json, DefaultCursor cursor) {
cursor.setPitId(json.getString(PIT_ID));

cursor.setSortFields(getSortFieldsFromJson(json));

// Retrieve and set the SearchSourceBuilder from the JSON field
String searchSourceBuilderBase64 = json.getString("searchSourceBuilder");
byte[] bytes = Base64.getDecoder().decode(searchSourceBuilderBase64);
ByteArrayInputStream streamInput = new ByteArrayInputStream(bytes);
try {
XContentParser parser =
XContentType.JSON
.xContent()
.createParser(xContentRegistry, IGNORE_DEPRECATIONS, streamInput);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.fromXContent(parser);
cursor.setSearchSourceBuilder(sourceBuilder);
} catch (IOException ex) {
throw new RuntimeException("Failed to get searchSourceBuilder from cursor Id", ex);
}
}

private static Object[] getSortFieldsFromJson(JSONObject json) {
return AccessController.doPrivileged(
(PrivilegedAction<Object[]>)
() -> {
try {
return objectMapper.readValue(json.getString(SORT_FIELDS), Object[].class);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to parse sort fields from JSON string.", e);
}
});
}

private JSONArray getSchemaAsJson() {
JSONArray schemaJson = new JSONArray();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.legacy.executor.cursor;

import static org.opensearch.core.rest.RestStatus.OK;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.util.Map;
import org.apache.logging.log4j.LogManager;
Expand All @@ -18,8 +19,11 @@
import org.opensearch.rest.RestChannel;
import org.opensearch.sql.legacy.cursor.CursorType;
import org.opensearch.sql.legacy.cursor.DefaultCursor;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl;
import org.opensearch.sql.legacy.rewriter.matchtoterm.VerificationException;

public class CursorCloseExecutor implements CursorRestExecutor {
Expand Down Expand Up @@ -79,14 +83,26 @@ public String execute(Client client, Map<String, String> params) throws Exceptio
}

private String handleDefaultCursorCloseRequest(Client client, DefaultCursor cursor) {
String scrollId = cursor.getScrollId();
ClearScrollResponse clearScrollResponse =
client.prepareClearScroll().addScrollId(scrollId).get();
if (clearScrollResponse.isSucceeded()) {
return SUCCEEDED_TRUE;
if (LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
String pitId = cursor.getPitId();
PointInTimeHandler pit = new PointInTimeHandlerImpl(client, pitId);
try {
pit.delete();
return SUCCEEDED_TRUE;
} catch (RuntimeException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
return SUCCEEDED_FALSE;
}
} else {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
return SUCCEEDED_FALSE;
String scrollId = cursor.getScrollId();
ClearScrollResponse clearScrollResponse =
client.prepareClearScroll().addScrollId(scrollId).get();
if (clearScrollResponse.isSucceeded()) {
return SUCCEEDED_TRUE;
} else {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
return SUCCEEDED_FALSE;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.sql.legacy.executor.cursor;

import static org.opensearch.core.rest.RestStatus.OK;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;
import static org.opensearch.sql.common.setting.Settings.Key.SQL_PAGINATION_API_SEARCH_AFTER;

import java.util.Arrays;
import java.util.Map;
Expand All @@ -14,21 +16,25 @@
import org.json.JSONException;
import org.opensearch.OpenSearchException;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.Client;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestChannel;
import org.opensearch.search.SearchHit;
import org.opensearch.search.SearchHits;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.search.builder.PointInTimeBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.sql.legacy.cursor.CursorType;
import org.opensearch.sql.legacy.cursor.DefaultCursor;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.executor.Format;
import org.opensearch.sql.legacy.executor.format.Protocol;
import org.opensearch.sql.legacy.metrics.MetricName;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.legacy.pit.PointInTimeHandler;
import org.opensearch.sql.legacy.pit.PointInTimeHandlerImpl;
import org.opensearch.sql.legacy.rewriter.matchtoterm.VerificationException;

public class CursorResultExecutor implements CursorRestExecutor {
Expand Down Expand Up @@ -91,14 +97,27 @@ public String execute(Client client, Map<String, String> params) throws Exceptio
}

private String handleDefaultCursorRequest(Client client, DefaultCursor cursor) {
String previousScrollId = cursor.getScrollId();
LocalClusterState clusterState = LocalClusterState.state();
TimeValue scrollTimeout = clusterState.getSettingValue(Settings.Key.SQL_CURSOR_KEEP_ALIVE);
SearchResponse scrollResponse =
client.prepareSearchScroll(previousScrollId).setScroll(scrollTimeout).get();
TimeValue paginationTimeout = clusterState.getSettingValue(SQL_CURSOR_KEEP_ALIVE);

SearchResponse scrollResponse = null;
if (clusterState.getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
String pitId = cursor.getPitId();
SearchSourceBuilder source = cursor.getSearchSourceBuilder();
source.searchAfter(cursor.getSortFields());
source.pointInTimeBuilder(new PointInTimeBuilder(pitId));
SearchRequest searchRequest = new SearchRequest();
searchRequest.source(source);
scrollResponse = client.search(searchRequest).actionGet();
} else {
String previousScrollId = cursor.getScrollId();
scrollResponse =
client.prepareSearchScroll(previousScrollId).setScroll(paginationTimeout).get();
}
SearchHits searchHits = scrollResponse.getHits();
SearchHit[] searchHitArray = searchHits.getHits();
String newScrollId = scrollResponse.getScrollId();
String newPitId = scrollResponse.pointInTimeId();

int rowsLeft = (int) cursor.getRowsLeft();
int fetch = cursor.getFetchSize();
Expand All @@ -124,16 +143,37 @@ private String handleDefaultCursorRequest(Client client, DefaultCursor cursor) {

if (rowsLeft <= 0) {
/** Clear the scroll context on last page */
ClearScrollResponse clearScrollResponse =
client.prepareClearScroll().addScrollId(newScrollId).get();
if (!clearScrollResponse.isSucceeded()) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
LOG.info("Error closing the cursor context {} ", newScrollId);
if (newScrollId != null) {
ClearScrollResponse clearScrollResponse =
client.prepareClearScroll().addScrollId(newScrollId).get();
if (!clearScrollResponse.isSucceeded()) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
LOG.info("Error closing the cursor context {} ", newScrollId);
}
}
if (newPitId != null) {
PointInTimeHandler pit = new PointInTimeHandlerImpl(client, newPitId);
try {
pit.delete();
} catch (RuntimeException e) {
Metrics.getInstance().getNumericalMetric(MetricName.FAILED_REQ_COUNT_SYS).increment();
LOG.info("Error deleting point in time {} ", newPitId);
}
}
}

cursor.setRowsLeft(rowsLeft);
cursor.setScrollId(newScrollId);
if (clusterState.getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)) {
cursor.setPitId(newPitId);
cursor.setSearchSourceBuilder(cursor.getSearchSourceBuilder());
cursor.setSortFields(
scrollResponse
.getHits()
.getAt(scrollResponse.getHits().getHits().length - 1)
.getSortValues());
} else {
cursor.setScrollId(newScrollId);
}
Protocol protocol = new Protocol(client, searchHits, format.name().toLowerCase(), cursor);
return protocol.cursorFormat();
}
Expand Down
Loading

0 comments on commit 69853fe

Please sign in to comment.