Skip to content

Commit

Permalink
Added rest layer changes for List all PITs and PIT segments (#4388)
Browse files Browse the repository at this point in the history
* Changes for list all and pit segments

Signed-off-by: Bharathwaj G <[email protected]>
(cherry picked from commit bb47419)
  • Loading branch information
bharath-techie committed Sep 27, 2022
1 parent a9558e3 commit 2b793dc
Show file tree
Hide file tree
Showing 29 changed files with 655 additions and 231 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085))
- Point in time rest layer changes for create and delete PIT API ([#4064](https://github.com/opensearch-project/OpenSearch/pull/4064))
- Add failover support with Segment Replication enabled. ([#4325](https://github.com/opensearch-project/OpenSearch/pull/4325)
- Point in time rest layer changes for list PIT and PIT segments API ([#4388](https://github.com/opensearch-project/OpenSearch/pull/4388))
- Added @dreamer-89 as an Opensearch maintainer ([#4342](https://github.com/opensearch-project/OpenSearch/pull/4342))
- Added release notes for 1.3.5 ([#4343](https://github.com/opensearch-project/OpenSearch/pull/4343))
- Added release notes for 2.2.1 ([#4344](https://github.com/opensearch-project/OpenSearch/pull/4344))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,10 @@ static Request deleteAllPits() {
return new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time/_all");
}

static Request getAllPits() {
return new Request(HttpGet.METHOD_NAME, "/_search/point_in_time/_all");
}

static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException {
Request request = new Request(HttpPost.METHOD_NAME, "/_msearch");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.GetAllPitNodesResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
Expand Down Expand Up @@ -1368,6 +1369,40 @@ public final Cancellable deleteAllPitsAsync(RequestOptions options, ActionListen
);
}

/**
* Get all point in time searches using list all PITs API
*
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final GetAllPitNodesResponse getAllPits(RequestOptions options) throws IOException {
return performRequestAndParseEntity(
new MainRequest(),
(request) -> RequestConverters.getAllPits(),
options,
GetAllPitNodesResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously get all point in time searches using list all PITs API
*
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable getAllPitsAsync(RequestOptions options, ActionListener<GetAllPitNodesResponse> listener) {
return performRequestAsyncAndParseEntity(
new MainRequest(),
(request) -> RequestConverters.getAllPits(),
options,
GetAllPitNodesResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Clears one or more scroll ids using the Clear Scroll API.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import org.opensearch.action.search.DeletePitInfo;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.GetAllPitNodesResponse;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Tests point in time API with rest high level client
Expand Down Expand Up @@ -52,21 +54,24 @@ public void indexDocuments() throws IOException {

public void testCreateAndDeletePit() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertEquals(1, pitResponse.getTotalShards());
assertEquals(1, pitResponse.getSuccessfulShards());
assertEquals(0, pitResponse.getFailedShards());
assertEquals(0, pitResponse.getSkippedShards());
CreatePitResponse createPitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(createPitResponse.getId() != null);
assertEquals(1, createPitResponse.getTotalShards());
assertEquals(1, createPitResponse.getSuccessfulShards());
assertEquals(0, createPitResponse.getFailedShards());
assertEquals(0, createPitResponse.getSkippedShards());
GetAllPitNodesResponse getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);
List<String> pits = getAllPitResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
assertTrue(pits.contains(createPitResponse.getId()));
List<String> pitIds = new ArrayList<>();
pitIds.add(pitResponse.getId());
pitIds.add(createPitResponse.getId());
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
DeletePitResponse deletePitResponse = execute(deletePitRequest, highLevelClient()::deletePit, highLevelClient()::deletePitAsync);
assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful());
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId()));
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(createPitResponse.getId()));
}

public void testDeleteAllPits() throws IOException {
public void testDeleteAllAndListAllPits() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
CreatePitResponse pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
Expand All @@ -80,6 +85,11 @@ public void testDeleteAllPits() throws IOException {
pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertTrue(pitResponse1.getId() != null);
GetAllPitNodesResponse getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);

List<String> pits = getAllPitResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
assertTrue(pits.contains(pitResponse.getId()));
assertTrue(pits.contains(pitResponse1.getId()));
ActionListener<DeletePitResponse> deletePitListener = new ActionListener<>() {
@Override
public void onResponse(DeletePitResponse response) {
Expand All @@ -95,8 +105,27 @@ public void onFailure(Exception e) {
}
}
};
final CreatePitResponse pitResponse3 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);

ActionListener<GetAllPitNodesResponse> getPitsListener = new ActionListener<GetAllPitNodesResponse>() {
@Override
public void onResponse(GetAllPitNodesResponse response) {
List<String> pits = response.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
assertTrue(pits.contains(pitResponse3.getId()));
}

@Override
public void onFailure(Exception e) {
if (!(e instanceof OpenSearchStatusException)) {
throw new AssertionError("List all PITs failed", e);
}
}
};
highLevelClient().getAllPitsAsync(RequestOptions.DEFAULT, getPitsListener);
highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener);
// validate no pits case
getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);
assertTrue(getAllPitResponse.getPitInfos().size() == 0);
highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ public class RestHighLevelClientTests extends OpenSearchTestCase {
"ping",
"info",
"delete_all_pits",
"get_all_pits",
// security
"security.get_ssl_certificates",
"security.authenticate",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"get_all_pits":{
"documentation":{
"url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/",
"description":"Lists all active point in time searches."
},
"stability":"stable",
"url":{
"paths":[
{
"path":"/_search/point_in_time/_all",
"methods":[
"GET"
]
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
---
"Help":
- skip:
version: " - 2.2.99"
reason: point in time stats were added in 2.3.0
version: " - 2.3.99"
reason: point in time stats were added in 2.4.0
features: node_selector
- do:
cat.shards:
help: true
node_selector:
version: "2.3.0 - "
version: "2.4.0 - "

- match:
$body: |
Expand Down Expand Up @@ -90,14 +90,14 @@
---
"Help before - 2.3.0":
- skip:
version: "2.3.0 - "
reason: point in time stats were added in 2.3.0
version: "2.4.0 - "
reason: point in time stats were added in 2.4.0
features: node_selector
- do:
cat.shards:
help: true
node_selector:
version: " - 2.2.99"
version: " - 2.3.99"

- match:
$body: |
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"Create PIT, Search with PIT ID and Delete":
- skip:
version: " - 2.2.99"
reason: "mode to be introduced later than 2.3"
version: " - 2.3.99"
reason: "mode to be introduced later than 2.4"
- do:
indices.create:
index: test_pit
Expand Down Expand Up @@ -79,6 +79,12 @@
- match: {hits.total: 3 }
- length: {hits.hits: 1 }

- do:
get_all_pits: {}

- match: {pits.0.pit_id: $pit_id}
- match: {pits.0.keep_alive: 82800000 }

- do:
delete_pit:
body:
Expand All @@ -90,8 +96,8 @@
---
"Delete all":
- skip:
version: " - 2.2.99"
reason: "mode to be introduced later than 2.3"
version: " - 2.3.99"
reason: "mode to be introduced later than 2.4"
- do:
indices.create:
index: test_pit
Expand Down Expand Up @@ -119,6 +125,12 @@
- set: {pit_id: pit_id}
- match: { _shards.failed: 0}

- do:
get_all_pits: {}

- match: {pits.0.pit_id: $pit_id}
- match: {pits.0.keep_alive: 82800000 }

- do:
delete_all_pits: {}

Expand Down
11 changes: 6 additions & 5 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -238,16 +238,14 @@
import org.opensearch.action.search.ClearScrollAction;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.GetAllPitsAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.NodesGetAllPitsAction;
import org.opensearch.action.search.GetAllPitsAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
import org.opensearch.action.search.TransportGetAllPitsAction;
import org.opensearch.action.search.TransportNodesGetAllPitsAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
Expand Down Expand Up @@ -385,6 +383,7 @@
import org.opensearch.rest.action.cat.RestClusterManagerAction;
import org.opensearch.rest.action.cat.RestNodeAttrsAction;
import org.opensearch.rest.action.cat.RestNodesAction;
import org.opensearch.rest.action.cat.RestPitSegmentsAction;
import org.opensearch.rest.action.cat.RestPluginsAction;
import org.opensearch.rest.action.cat.RestRepositoriesAction;
import org.opensearch.rest.action.cat.RestSegmentsAction;
Expand Down Expand Up @@ -413,6 +412,7 @@
import org.opensearch.rest.action.search.RestCreatePitAction;
import org.opensearch.rest.action.search.RestDeletePitAction;
import org.opensearch.rest.action.search.RestExplainAction;
import org.opensearch.rest.action.search.RestGetAllPitsAction;
import org.opensearch.rest.action.search.RestMultiSearchAction;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.rest.action.search.RestSearchScrollAction;
Expand Down Expand Up @@ -678,10 +678,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

// point in time actions
actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class);
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);
actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class);
actions.register(NodesGetAllPitsAction.INSTANCE, TransportNodesGetAllPitsAction.class);
actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class);

return unmodifiableMap(actions.getRegistry());
}
Expand Down Expand Up @@ -858,6 +857,8 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
// Point in time API
registerHandler.accept(new RestCreatePitAction());
registerHandler.accept(new RestDeletePitAction());
registerHandler.accept(new RestGetAllPitsAction(nodesInCluster));
registerHandler.accept(new RestPitSegmentsAction(nodesInCluster));

for (ActionPlugin plugin : actionPlugins) {
for (RestHandler handler : plugin.getRestHandlers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.Strings;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.xcontent.XContentParser;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -84,4 +85,37 @@ public ActionRequestValidationException validate() {
}
return validationException;
}

public void fromXContent(XContentParser parser) throws IOException {
pitIds.clear();
if (parser.nextToken() != XContentParser.Token.START_OBJECT) {
throw new IllegalArgumentException("Malformed content, must start with an object");
} else {
XContentParser.Token token;
String currentFieldName = null;
while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) {
if (token == XContentParser.Token.FIELD_NAME) {
currentFieldName = parser.currentName();
} else if ("pit_id".equals(currentFieldName)) {
if (token == XContentParser.Token.START_ARRAY) {
while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) {
if (token.isValue() == false) {
throw new IllegalArgumentException("pit_id array element should only contain PIT identifier");
}
pitIds.add(parser.text());
}
} else {
if (token.isValue() == false) {
throw new IllegalArgumentException("pit_id element should only contain PIT identifier");
}
pitIds.add(parser.text());
}
} else {
throw new IllegalArgumentException(
"Unknown parameter [" + currentFieldName + "] in request body or parameter is of the wrong type[" + token + "] "
);
}
}
}
}
}
Loading

0 comments on commit 2b793dc

Please sign in to comment.