Skip to content

Commit

Permalink
Addressed comments
Browse files Browse the repository at this point in the history
Signed-off-by: Manasvini B S <[email protected]>
  • Loading branch information
manasvinibs committed Aug 22, 2024
1 parent 167dce0 commit fe91e52
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
import com.google.common.base.Strings;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -26,7 +27,6 @@
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.Setter;
import lombok.SneakyThrows;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -99,7 +99,7 @@ public class DefaultCursor implements Cursor {
private String pitId;

/** To get next batch of result with search after api */
public SearchSourceBuilder searchSourceBuilder;
private SearchSourceBuilder searchSourceBuilder;

/** To get last sort values * */
private Object[] sortFields;
Expand All @@ -115,7 +115,7 @@ public class DefaultCursor implements Cursor {
*/
private static final NamedXContentRegistry xContentRegistry =
new NamedXContentRegistry(
new SearchModule(Settings.builder().build(), new ArrayList<>()).getNamedXContents());
new SearchModule(Settings.EMPTY, Collections.emptyList()).getNamedXContents());

@Override
public CursorType getType() {
Expand All @@ -124,11 +124,7 @@ public CursorType getType() {

@Override
public String generateCursorId() {
boolean isCursorValid =
LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)
? Strings.isNullOrEmpty(pitId)
: Strings.isNullOrEmpty(scrollId);
if (rowsLeft <= 0 || isCursorValid) {
if (rowsLeft <= 0 || isCursorIdNullOrEmpty()) {
return null;
}
JSONObject json = new JSONObject();
Expand All @@ -146,24 +142,44 @@ public String generateCursorId() {
try {
return objectMapper.writeValueAsString(sortFields);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
throw new RuntimeException(
"Failed to parse sort fields from JSON string.", e);
}
});
json.put(SORT_FIELDS, sortFieldValue);
} else {
json.put(SCROLL_ID, scrollId);
}
return String.format("%s:%s", type.getId(), encodeCursor(json, searchSourceBuilder));
setSearchRequestString(json, searchSourceBuilder);
return String.format("%s:%s", type.getId(), encodeCursor(json));
}

private void setSearchRequestString(JSONObject cursorJson, SearchSourceBuilder sourceBuilder) {
try {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
XContentBuilder builder = XContentFactory.jsonBuilder(outputStream);
sourceBuilder.toXContent(builder, null);
builder.close();

String searchRequestBase64 = Base64.getEncoder().encodeToString(outputStream.toByteArray());
cursorJson.put("searchSourceBuilder", searchRequestBase64);
} catch (IOException ex) {
throw new RuntimeException("Failed to set search request string on cursor json.", ex);
}
}

private boolean isCursorIdNullOrEmpty() {
return LocalClusterState.state().getSettingValue(SQL_PAGINATION_API_SEARCH_AFTER)
? Strings.isNullOrEmpty(pitId)
: Strings.isNullOrEmpty(scrollId);
}

@SneakyThrows
public static DefaultCursor from(String cursorId) {
/**
* It is assumed that cursorId here is the second part of the original cursor passed by the
* client after removing first part which identifies cursor type
*/
String[] parts = cursorId.split(":::");
JSONObject json = decodeCursor(parts[0]);
JSONObject json = decodeCursor(cursorId);
DefaultCursor cursor = new DefaultCursor();
cursor.setFetchSize(json.getInt(FETCH_SIZE));
cursor.setRowsLeft(json.getLong(ROWS_LEFT));
Expand All @@ -178,19 +194,26 @@ public static DefaultCursor from(String cursorId) {
try {
return objectMapper.readValue(json.getString(SORT_FIELDS), Object[].class);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
throw new RuntimeException(
"Failed to parse sort fields from JSON string.", e);
}
});
cursor.setSortFields(sortFieldValue);

byte[] bytes = Base64.getDecoder().decode(parts[1]);
// Retrieve the SearchSourceBuilder from the JSON field
String searchSourceBuilderBase64 = json.getString("searchSourceBuilder");
byte[] bytes = Base64.getDecoder().decode(searchSourceBuilderBase64);
ByteArrayInputStream streamInput = new ByteArrayInputStream(bytes);
XContentParser parser =
XContentType.JSON
.xContent()
.createParser(xContentRegistry, IGNORE_DEPRECATIONS, streamInput);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.fromXContent(parser);
cursor.searchSourceBuilder = sourceBuilder;
try {
XContentParser parser =
XContentType.JSON
.xContent()
.createParser(xContentRegistry, IGNORE_DEPRECATIONS, streamInput);
SearchSourceBuilder sourceBuilder = SearchSourceBuilder.fromXContent(parser);
cursor.setSearchSourceBuilder(sourceBuilder);
} catch (IOException ex) {
throw new RuntimeException("Failed to get searchSourceBuilder from cursor Id", ex);
}
} else {
cursor.setScrollId(json.getString(SCROLL_ID));
}
Expand Down Expand Up @@ -220,18 +243,8 @@ private JSONObject schemaEntry(String name, String alias, String type) {
return entry;
}

@SneakyThrows
private static String encodeCursor(JSONObject cursorJson, SearchSourceBuilder sourceBuilder) {
String jsonBase64 = Base64.getEncoder().encodeToString(cursorJson.toString().getBytes());

ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
XContentBuilder builder = XContentFactory.jsonBuilder(outputStream);
sourceBuilder.toXContent(builder, null);
builder.close();

String searchRequestBase64 = Base64.getEncoder().encodeToString(outputStream.toByteArray());

return jsonBase64 + ":::" + searchRequestBase64;
private static String encodeCursor(JSONObject cursorJson) {
return Base64.getEncoder().encodeToString(cursorJson.toString().getBytes());
}

private static JSONObject decodeCursor(String cursorId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,26 @@

import static org.opensearch.sql.common.setting.Settings.Key.SQL_CURSOR_KEEP_ALIVE;

import java.util.concurrent.ExecutionException;
import lombok.Getter;
import lombok.Setter;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.client.Client;
import org.opensearch.core.action.ActionListener;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;

/** Handler for Point In Time */
public class PointInTimeHandlerImpl implements PointInTimeHandler {
private Client client;
private String[] indices;
@Getter @Setter private String pitId;
private Boolean deleteStatus = null;
private Boolean createStatus = null;
private static final Logger LOG = LogManager.getLogger();

/**
Expand Down Expand Up @@ -60,31 +61,16 @@ public boolean create() {
CreatePitRequest createPitRequest =
new CreatePitRequest(
LocalClusterState.state().getSettingValue(SQL_CURSOR_KEEP_ALIVE), false, indices);
client.createPit(
createPitRequest,
new ActionListener<>() {

@Override
public void onResponse(CreatePitResponse createPitResponse) {
pitId = createPitResponse.getId();
createStatus = true;
LOG.info("Created Point In Time {} successfully.", pitId);
}

@Override
public void onFailure(Exception e) {
createStatus = false;
LOG.error("Error occurred while creating PIT", e);
}
});
while (createStatus == null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.error("Error occurred while creating PIT", e);
}
ActionFuture<CreatePitResponse> execute =
client.execute(CreatePitAction.INSTANCE, createPitRequest);
try {
CreatePitResponse pitResponse = execute.get();
pitId = pitResponse.getId();
LOG.info("Created Point In Time {} successfully.", pitId);
return true;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Error occurred while creating PIT.", e);
}
return createStatus;
}

/**
Expand All @@ -95,32 +81,14 @@ public void onFailure(Exception e) {
@Override
public boolean delete() {
DeletePitRequest deletePitRequest = new DeletePitRequest(pitId);
client.deletePits(
deletePitRequest,
new ActionListener<>() {
@Override
public void onResponse(DeletePitResponse deletePitResponse) {
deleteStatus = true;
LOG.info(
"Delete Point In Time {} status: {}",
pitId,
deletePitResponse.status().getStatus());
}

@Override
public void onFailure(Exception e) {
deleteStatus = false;
LOG.error("Error occurred while deleting PIT", e);
}
});

while (deleteStatus == null) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.error("Error occurred while deleting PIT", e);
}
ActionFuture<DeletePitResponse> execute =
client.execute(DeletePitAction.INSTANCE, deletePitRequest);
try {
DeletePitResponse deletePitResponse = execute.get();
LOG.info("Delete Point In Time {} status: {}", pitId, deletePitResponse.status().getStatus());
return true;
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("Error occurred while deleting PIT.", e);
}
return deleteStatus;
}
}
Loading

0 comments on commit fe91e52

Please sign in to comment.