From 689a2c44eee85b025c95516c4949d3a1bc3ec284 Mon Sep 17 00:00:00 2001 From: Bharathwaj G <58062316+bharath-techie@users.noreply.github.com> Date: Thu, 1 Sep 2022 10:58:45 +0530 Subject: [PATCH] Add changes for Create PIT and Delete PIT rest layer and rest high level client (#4064) * Create and delete PIT search rest layer changes Signed-off-by: Bharathwaj G --- CHANGELOG.md | 1 + .../opensearch/client/RequestConverters.java | 38 ++++- .../client/RestHighLevelClient.java | 118 ++++++++++++++++ .../java/org/opensearch/client/PitIT.java | 102 ++++++++++++++ .../client/RequestConvertersTests.java | 44 ++++++ .../client/RestHighLevelClientTests.java | 1 + .../java/org/opensearch/client/SearchIT.java | 47 +++++++ .../rest-api-spec/api/create_pit.json | 44 ++++++ .../rest-api-spec/api/delete_all_pits.json | 19 +++ .../rest-api-spec/api/delete_pit.json | 23 +++ .../rest-api-spec/test/pit/10_basic.yml | 130 +++++++++++++++++ .../org/opensearch/action/ActionModule.java | 10 ++ .../action/search/DeletePitInfo.java | 4 +- .../action/search/DeletePitRequest.java | 5 + .../action/search/GetAllPitNodesRequest.java | 11 ++ .../action/search/GetAllPitNodesResponse.java | 8 ++ .../action/search/NodesGetAllPitsAction.java | 23 +++ .../opensearch/action/search/PitService.java | 23 ++- .../search/TransportDeletePitAction.java | 6 +- .../search/TransportGetAllPitsAction.java | 80 +++-------- .../TransportNodesGetAllPitsAction.java | 86 +++++++++++ .../action/search/RestCreatePitAction.java | 57 ++++++++ .../action/search/RestDeletePitAction.java | 60 ++++++++ .../org/opensearch/search/SearchService.java | 4 +- .../search/CreatePitControllerTests.java | 11 +- .../search/TransportDeletePitActionTests.java | 18 +-- .../search/CreatePitSingleNodeTests.java | 52 +++++++ .../search/pit/RestCreatePitActionTests.java | 78 ++++++++++ .../search/pit/RestDeletePitActionTests.java | 133 ++++++++++++++++++ 29 files changed, 1152 insertions(+), 84 deletions(-) create mode 100644 client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/delete_all_pits.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/delete_pit.json create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml create mode 100644 server/src/main/java/org/opensearch/action/search/NodesGetAllPitsAction.java create mode 100644 server/src/main/java/org/opensearch/action/search/TransportNodesGetAllPitsAction.java create mode 100644 server/src/main/java/org/opensearch/rest/action/search/RestCreatePitAction.java create mode 100644 server/src/main/java/org/opensearch/rest/action/search/RestDeletePitAction.java create mode 100644 server/src/test/java/org/opensearch/search/pit/RestCreatePitActionTests.java create mode 100644 server/src/test/java/org/opensearch/search/pit/RestDeletePitActionTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 07b08d853cf45..f89e7eba0698c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ## [Unreleased] ### Added - Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085)) +- Point in time rest layer changes for create and delete PIT API ([#4064](https://github.com/opensearch-project/OpenSearch/pull/4064)) - Added @dreamer-89 as an Opensearch maintainer ([#4342](https://github.com/opensearch-project/OpenSearch/pull/4342)) - Added release notes for 1.3.5 ([#4343](https://github.com/opensearch-project/OpenSearch/pull/4343)) - Added release notes for 2.2.1 ([#4344](https://github.com/opensearch-project/OpenSearch/pull/4344)) diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java index 6fa57295f48e4..eedc27d1d2ea7 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java @@ -54,6 +54,8 @@ import org.opensearch.action.get.MultiGetRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.DeletePitRequest; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchScrollRequest; @@ -92,6 +94,7 @@ import org.opensearch.index.reindex.ReindexRequest; import org.opensearch.index.reindex.UpdateByQueryRequest; import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.rest.action.search.RestCreatePitAction; import org.opensearch.rest.action.search.RestSearchAction; import org.opensearch.script.mustache.MultiSearchTemplateRequest; import org.opensearch.script.mustache.SearchTemplateRequest; @@ -433,9 +436,19 @@ static void addSearchRequestParams(Params params, SearchRequest searchRequest) { params.putParam(RestSearchAction.TYPED_KEYS_PARAM, "true"); params.withRouting(searchRequest.routing()); params.withPreference(searchRequest.preference()); - params.withIndicesOptions(searchRequest.indicesOptions()); + if (searchRequest.pointInTimeBuilder() == null) { + params.withIndicesOptions(searchRequest.indicesOptions()); + } params.withSearchType(searchRequest.searchType().name().toLowerCase(Locale.ROOT)); - params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips())); + /** + * Merging search responses as part of CCS flow to reduce roundtrips is not supported for point in time - + * refer to org.opensearch.action.search.SearchResponseMerger + */ + if (searchRequest.pointInTimeBuilder() != null) { + params.putParam("ccs_minimize_roundtrips", "false"); + } else { + params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips())); + } if (searchRequest.getPreFilterShardSize() != null) { params.putParam("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize())); } @@ -464,6 +477,27 @@ static Request clearScroll(ClearScrollRequest clearScrollRequest) throws IOExcep return request; } + static Request createPit(CreatePitRequest createPitRequest) throws IOException { + Params params = new Params(); + params.putParam(RestCreatePitAction.ALLOW_PARTIAL_PIT_CREATION, Boolean.toString(createPitRequest.shouldAllowPartialPitCreation())); + params.putParam(RestCreatePitAction.KEEP_ALIVE, createPitRequest.getKeepAlive()); + params.withIndicesOptions(createPitRequest.indicesOptions()); + Request request = new Request(HttpPost.METHOD_NAME, endpoint(createPitRequest.indices(), "_search/point_in_time")); + request.addParameters(params.asMap()); + request.setEntity(createEntity(createPitRequest, REQUEST_BODY_CONTENT_TYPE)); + return request; + } + + static Request deletePit(DeletePitRequest deletePitRequest) throws IOException { + Request request = new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time"); + request.setEntity(createEntity(deletePitRequest, REQUEST_BODY_CONTENT_TYPE)); + return request; + } + + static Request deleteAllPits() { + return new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time/_all"); + } + static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException { Request request = new Request(HttpPost.METHOD_NAME, "/_msearch"); diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java index 28a441bdf7f7f..0c73c65f6175f 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java @@ -59,6 +59,10 @@ import org.opensearch.action.index.IndexResponse; import org.opensearch.action.search.ClearScrollRequest; import org.opensearch.action.search.ClearScrollResponse; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.DeletePitResponse; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.SearchRequest; @@ -1250,6 +1254,120 @@ public final Cancellable scrollAsync( ); } + /** + * Create PIT context using create PIT API + * + * @param createPitRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + */ + public final CreatePitResponse createPit(CreatePitRequest createPitRequest, RequestOptions options) throws IOException { + return performRequestAndParseEntity( + createPitRequest, + RequestConverters::createPit, + options, + CreatePitResponse::fromXContent, + emptySet() + ); + } + + /** + * Asynchronously Create PIT context using create PIT API + * + * @param createPitRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + * @return the response + */ + public final Cancellable createPitAsync( + CreatePitRequest createPitRequest, + RequestOptions options, + ActionListener listener + ) { + return performRequestAsyncAndParseEntity( + createPitRequest, + RequestConverters::createPit, + options, + CreatePitResponse::fromXContent, + listener, + emptySet() + ); + } + + /** + * Delete point in time searches using delete PIT API + * + * @param deletePitRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + */ + public final DeletePitResponse deletePit(DeletePitRequest deletePitRequest, RequestOptions options) throws IOException { + return performRequestAndParseEntity( + deletePitRequest, + RequestConverters::deletePit, + options, + DeletePitResponse::fromXContent, + emptySet() + ); + } + + /** + * Asynchronously Delete point in time searches using delete PIT API + * + * @param deletePitRequest the request + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + * @return the response + */ + public final Cancellable deletePitAsync( + DeletePitRequest deletePitRequest, + RequestOptions options, + ActionListener listener + ) { + return performRequestAsyncAndParseEntity( + deletePitRequest, + RequestConverters::deletePit, + options, + DeletePitResponse::fromXContent, + listener, + emptySet() + ); + } + + /** + * Delete all point in time searches using delete all PITs API + * + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @return the response + */ + public final DeletePitResponse deleteAllPits(RequestOptions options) throws IOException { + return performRequestAndParseEntity( + new MainRequest(), + (request) -> RequestConverters.deleteAllPits(), + options, + DeletePitResponse::fromXContent, + emptySet() + ); + } + + /** + * Asynchronously Delete all point in time searches using delete all PITs API + * + * @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized + * @param listener the listener to be notified upon request completion + * @return the response + */ + public final Cancellable deleteAllPitsAsync(RequestOptions options, ActionListener listener) { + return performRequestAsyncAndParseEntity( + new MainRequest(), + (request) -> RequestConverters.deleteAllPits(), + options, + DeletePitResponse::fromXContent, + listener, + emptySet() + ); + } + /** * Clears one or more scroll ids using the Clear Scroll API. * diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java b/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java new file mode 100644 index 0000000000000..395ec6e46a7b3 --- /dev/null +++ b/client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java @@ -0,0 +1,102 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client; + +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.junit.Before; +import org.opensearch.OpenSearchStatusException; +import org.opensearch.action.ActionListener; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.DeletePitInfo; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.common.unit.TimeValue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Tests point in time API with rest high level client + */ +public class PitIT extends OpenSearchRestHighLevelClientTestCase { + + @Before + public void indexDocuments() throws IOException { + Request doc1 = new Request(HttpPut.METHOD_NAME, "/index/_doc/1"); + doc1.setJsonEntity("{\"type\":\"type1\", \"id\":1, \"num\":10, \"num2\":50}"); + client().performRequest(doc1); + Request doc2 = new Request(HttpPut.METHOD_NAME, "/index/_doc/2"); + doc2.setJsonEntity("{\"type\":\"type1\", \"id\":2, \"num\":20, \"num2\":40}"); + client().performRequest(doc2); + Request doc3 = new Request(HttpPut.METHOD_NAME, "/index/_doc/3"); + doc3.setJsonEntity("{\"type\":\"type1\", \"id\":3, \"num\":50, \"num2\":35}"); + client().performRequest(doc3); + Request doc4 = new Request(HttpPut.METHOD_NAME, "/index/_doc/4"); + doc4.setJsonEntity("{\"type\":\"type2\", \"id\":4, \"num\":100, \"num2\":10}"); + client().performRequest(doc4); + Request doc5 = new Request(HttpPut.METHOD_NAME, "/index/_doc/5"); + doc5.setJsonEntity("{\"type\":\"type2\", \"id\":5, \"num\":100, \"num2\":10}"); + client().performRequest(doc5); + client().performRequest(new Request(HttpPost.METHOD_NAME, "/_refresh")); + } + + public void testCreateAndDeletePit() throws IOException { + CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index"); + CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); + assertTrue(pitResponse.getId() != null); + assertEquals(1, pitResponse.getTotalShards()); + assertEquals(1, pitResponse.getSuccessfulShards()); + assertEquals(0, pitResponse.getFailedShards()); + assertEquals(0, pitResponse.getSkippedShards()); + List pitIds = new ArrayList<>(); + pitIds.add(pitResponse.getId()); + DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds); + DeletePitResponse deletePitResponse = execute(deletePitRequest, highLevelClient()::deletePit, highLevelClient()::deletePitAsync); + assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); + assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId())); + } + + public void testDeleteAllPits() throws IOException { + CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index"); + CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); + CreatePitResponse pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); + assertTrue(pitResponse.getId() != null); + assertTrue(pitResponse1.getId() != null); + DeletePitResponse deletePitResponse = highLevelClient().deleteAllPits(RequestOptions.DEFAULT); + for (DeletePitInfo deletePitInfo : deletePitResponse.getDeletePitResults()) { + assertTrue(deletePitInfo.isSuccessful()); + } + pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); + pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); + assertTrue(pitResponse.getId() != null); + assertTrue(pitResponse1.getId() != null); + ActionListener deletePitListener = new ActionListener<>() { + @Override + public void onResponse(DeletePitResponse response) { + for (DeletePitInfo deletePitInfo : response.getDeletePitResults()) { + assertTrue(deletePitInfo.isSuccessful()); + } + } + + @Override + public void onFailure(Exception e) { + if (!(e instanceof OpenSearchStatusException)) { + throw new AssertionError("Delete all failed"); + } + } + }; + highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener); + // validate no pits case + highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener); + } +} diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/RequestConvertersTests.java index 97c0f2f475826..ee5795deb165d 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/RequestConvertersTests.java @@ -53,6 +53,8 @@ import org.opensearch.action.get.MultiGetRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.DeletePitRequest; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchScrollRequest; @@ -131,6 +133,7 @@ import java.util.Locale; import java.util.Map; import java.util.StringJoiner; +import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -1303,6 +1306,47 @@ public void testClearScroll() throws IOException { assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue()); } + public void testCreatePit() throws IOException { + String[] indices = randomIndicesNames(0, 5); + Map expectedParams = new HashMap<>(); + expectedParams.put("keep_alive", "1d"); + expectedParams.put("allow_partial_pit_creation", "true"); + CreatePitRequest createPitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, indices); + setRandomIndicesOptions(createPitRequest::indicesOptions, createPitRequest::indicesOptions, expectedParams); + Request request = RequestConverters.createPit(createPitRequest); + StringJoiner endpoint = new StringJoiner("/", "/", ""); + String index = String.join(",", indices); + if (Strings.hasLength(index)) { + endpoint.add(index); + } + endpoint.add("_search/point_in_time"); + assertEquals(HttpPost.METHOD_NAME, request.getMethod()); + assertEquals(endpoint.toString(), request.getEndpoint()); + assertEquals(expectedParams, request.getParameters()); + assertToXContentBody(createPitRequest, request.getEntity()); + assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue()); + } + + public void testDeletePit() throws IOException { + List pitIdsList = new ArrayList<>(); + pitIdsList.add("pitId1"); + pitIdsList.add("pitId2"); + DeletePitRequest deletePitRequest = new DeletePitRequest(pitIdsList); + Request request = RequestConverters.deletePit(deletePitRequest); + String endpoint = "/_search/point_in_time"; + assertEquals(HttpDelete.METHOD_NAME, request.getMethod()); + assertEquals(endpoint, request.getEndpoint()); + assertToXContentBody(deletePitRequest, request.getEntity()); + assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue()); + } + + public void testDeleteAllPits() { + Request request = RequestConverters.deleteAllPits(); + String endpoint = "/_search/point_in_time/_all"; + assertEquals(HttpDelete.METHOD_NAME, request.getMethod()); + assertEquals(endpoint, request.getEndpoint()); + } + public void testSearchTemplate() throws Exception { // Create a random request. String[] indices = randomIndicesNames(0, 5); diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java index 3da0f81023f72..cdd63743f2644 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java @@ -134,6 +134,7 @@ public class RestHighLevelClientTests extends OpenSearchTestCase { // core "ping", "info", + "delete_all_pits", // security "security.get_ssl_certificates", "security.authenticate", diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java b/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java index 19e287fb91be5..8b509e5d19e92 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java @@ -43,6 +43,10 @@ import org.opensearch.action.fieldcaps.FieldCapabilitiesResponse; import org.opensearch.action.search.ClearScrollRequest; import org.opensearch.action.search.ClearScrollResponse; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.DeletePitResponse; import org.opensearch.action.search.MultiSearchRequest; import org.opensearch.action.search.MultiSearchResponse; import org.opensearch.action.search.SearchRequest; @@ -89,6 +93,7 @@ import org.opensearch.search.aggregations.metrics.WeightedAvgAggregationBuilder; import org.opensearch.search.aggregations.support.MultiValuesSourceFieldConfig; import org.opensearch.search.aggregations.support.ValueType; +import org.opensearch.search.builder.PointInTimeBuilder; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.fetch.subphase.FetchSourceContext; import org.opensearch.search.fetch.subphase.highlight.HighlightBuilder; @@ -100,11 +105,13 @@ import org.junit.Before; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertToXContentEquivalent; @@ -762,6 +769,46 @@ public void testSearchScroll() throws Exception { } } + public void testSearchWithPit() throws Exception { + for (int i = 0; i < 100; i++) { + XContentBuilder builder = jsonBuilder().startObject().field("field", i).endObject(); + Request doc = new Request(HttpPut.METHOD_NAME, "/test/_doc/" + Integer.toString(i)); + doc.setJsonEntity(Strings.toString(builder)); + client().performRequest(doc); + } + client().performRequest(new Request(HttpPost.METHOD_NAME, "/test/_refresh")); + + CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "test"); + CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync); + + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(35) + .sort("field", SortOrder.ASC) + .pointInTimeBuilder(new PointInTimeBuilder(pitResponse.getId())); + SearchRequest searchRequest = new SearchRequest().source(searchSourceBuilder); + SearchResponse searchResponse = execute(searchRequest, highLevelClient()::search, highLevelClient()::searchAsync); + + try { + long counter = 0; + assertSearchHeader(searchResponse); + assertThat(searchResponse.getHits().getTotalHits().value, equalTo(100L)); + assertThat(searchResponse.getHits().getHits().length, equalTo(35)); + for (SearchHit hit : searchResponse.getHits()) { + assertThat(((Number) hit.getSortValues()[0]).longValue(), equalTo(counter++)); + } + } finally { + List pitIds = new ArrayList<>(); + pitIds.add(pitResponse.getId()); + DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds); + DeletePitResponse deletePitResponse = execute( + deletePitRequest, + highLevelClient()::deletePit, + highLevelClient()::deletePitAsync + ); + assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful()); + assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId())); + } + } + public void testMultiSearch() throws Exception { MultiSearchRequest multiSearchRequest = new MultiSearchRequest(); SearchRequest searchRequest1 = new SearchRequest("index1"); diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json b/rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json new file mode 100644 index 0000000000000..d3a2104c01bc0 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/create_pit.json @@ -0,0 +1,44 @@ + +{ + "create_pit":{ + "documentation":{ + "url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/", + "description":"Creates point in time context." + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/{index}/_search/point_in_time", + "methods":[ + "POST" + ], + "parts":{ + "index":{ + "type":"list", + "description":"A comma-separated list of index names to search; use `_all` or empty string to perform the operation on all indices" + } + } + } + ] + }, + "params":{ + "allow_partial_pit_creation":{ + "type":"boolean", + "description":"Allow if point in time can be created with partial failures" + }, + "keep_alive":{ + "type":"string", + "description":"Specify the keep alive for point in time" + }, + "preference":{ + "type":"string", + "description":"Specify the node or shard the operation should be performed on (default: random)" + }, + "routing":{ + "type":"list", + "description":"A comma-separated list of specific routing values" + } + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/delete_all_pits.json b/rest-api-spec/src/main/resources/rest-api-spec/api/delete_all_pits.json new file mode 100644 index 0000000000000..5ff01aa746df9 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/delete_all_pits.json @@ -0,0 +1,19 @@ +{ + "delete_all_pits":{ + "documentation":{ + "url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/", + "description":"Deletes all active point in time searches." + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/_search/point_in_time/_all", + "methods":[ + "DELETE" + ] + } + ] + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/delete_pit.json b/rest-api-spec/src/main/resources/rest-api-spec/api/delete_pit.json new file mode 100644 index 0000000000000..b54d9f76204f4 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/delete_pit.json @@ -0,0 +1,23 @@ +{ + "delete_pit":{ + "documentation":{ + "url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/", + "description":"Deletes one or more point in time searches based on the IDs passed." + }, + "stability":"stable", + "url":{ + "paths":[ + { + "path":"/_search/point_in_time", + "methods":[ + "DELETE" + ] + } + ] + }, + "body":{ + "description":"A comma-separated list of pit IDs to clear", + "required":true + } + } +} diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml new file mode 100644 index 0000000000000..2023bcc8f5c87 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/pit/10_basic.yml @@ -0,0 +1,130 @@ +"Create PIT, Search with PIT ID and Delete": + - skip: + version: " - 2.9.99" + reason: "mode to be introduced later than 3.0" + - do: + indices.create: + index: test_pit + - do: + index: + index: test_pit + id: 42 + body: { foo: 1 } + + - do: + index: + index: test_pit + id: 43 + body: { foo: 2 } + + - do: + indices.refresh: {} + + - do: + create_pit: + allow_partial_pit_creation: true + index: test_pit + keep_alive: 23h + + - set: {pit_id: pit_id} + - match: { _shards.failed: 0} + - do: + search: + rest_total_hits_as_int: true + size: 1 + sort: foo + body: + query: + match_all: {} + pit: {"id": "$pit_id"} + + - match: {hits.total: 2 } + - length: {hits.hits: 1 } + - match: {hits.hits.0._id: "42" } + + - do: + index: + index: test_pit + id: 44 + body: { foo: 3 } + + - do: + indices.refresh: {} + + - do: + search: + rest_total_hits_as_int: true + size: 1 + sort: foo + body: + query: + match_all: {} + pit: {"id": "$pit_id", "keep_alive":"10m"} + + - match: {hits.total: 2 } + - length: {hits.hits: 1 } + - match: {hits.hits.0._id: "42" } + + + - do: + search: + rest_total_hits_as_int: true + index: test_pit + size: 1 + sort: foo + body: + query: + match_all: {} + + - match: {hits.total: 3 } + - length: {hits.hits: 1 } + + - do: + delete_pit: + body: + "pit_id": [$pit_id] + + - match: {pits.0.pit_id: $pit_id} + - match: {pits.0.successful: true } + +--- +"Delete all": + - skip: + version: " - 2.9.99" + reason: "mode to be introduced later than 3.0" + - do: + indices.create: + index: test_pit + - do: + index: + index: test_pit + id: 42 + body: { foo: 1 } + + - do: + index: + index: test_pit + id: 43 + body: { foo: 2 } + + - do: + indices.refresh: {} + + - do: + create_pit: + allow_partial_pit_creation: true + index: test_pit + keep_alive: 23h + + - set: {pit_id: pit_id} + - match: { _shards.failed: 0} + + - do: + delete_all_pits: {} + + - match: {pits.0.pit_id: $pit_id} + - match: {pits.0.successful: true } + + - do: + catch: missing + delete_all_pits: { } diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index 797c5c38fada6..74be544123d9f 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -240,12 +240,14 @@ import org.opensearch.action.search.DeletePitAction; import org.opensearch.action.search.GetAllPitsAction; import org.opensearch.action.search.MultiSearchAction; +import org.opensearch.action.search.NodesGetAllPitsAction; import org.opensearch.action.search.SearchAction; import org.opensearch.action.search.SearchScrollAction; import org.opensearch.action.search.TransportClearScrollAction; import org.opensearch.action.search.TransportCreatePitAction; import org.opensearch.action.search.TransportDeletePitAction; import org.opensearch.action.search.TransportGetAllPitsAction; +import org.opensearch.action.search.TransportNodesGetAllPitsAction; import org.opensearch.action.search.TransportMultiSearchAction; import org.opensearch.action.search.TransportSearchAction; import org.opensearch.action.search.TransportSearchScrollAction; @@ -408,6 +410,8 @@ import org.opensearch.rest.action.ingest.RestSimulatePipelineAction; import org.opensearch.rest.action.search.RestClearScrollAction; import org.opensearch.rest.action.search.RestCountAction; +import org.opensearch.rest.action.search.RestCreatePitAction; +import org.opensearch.rest.action.search.RestDeletePitAction; import org.opensearch.rest.action.search.RestExplainAction; import org.opensearch.rest.action.search.RestMultiSearchAction; import org.opensearch.rest.action.search.RestSearchAction; @@ -674,6 +678,7 @@ public void reg actions.register(GetAllPitsAction.INSTANCE, TransportGetAllPitsAction.class); actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class); actions.register(PitSegmentsAction.INSTANCE, TransportPitSegmentsAction.class); + actions.register(NodesGetAllPitsAction.INSTANCE, TransportNodesGetAllPitsAction.class); // Remote Store actions.register(RestoreRemoteStoreAction.INSTANCE, TransportRestoreRemoteStoreAction.class); @@ -849,6 +854,11 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestRepositoriesAction()); registerHandler.accept(new RestSnapshotAction()); registerHandler.accept(new RestTemplatesAction()); + + // Point in time API + registerHandler.accept(new RestCreatePitAction()); + registerHandler.accept(new RestDeletePitAction()); + for (ActionPlugin plugin : actionPlugins) { for (RestHandler handler : plugin.getRestHandlers( settings, diff --git a/server/src/main/java/org/opensearch/action/search/DeletePitInfo.java b/server/src/main/java/org/opensearch/action/search/DeletePitInfo.java index 943199812771a..5a167c5a6f160 100644 --- a/server/src/main/java/org/opensearch/action/search/DeletePitInfo.java +++ b/server/src/main/java/org/opensearch/action/search/DeletePitInfo.java @@ -65,11 +65,11 @@ public void writeTo(StreamOutput out) throws IOException { static { PARSER.declareBoolean(constructorArg(), new ParseField("successful")); - PARSER.declareString(constructorArg(), new ParseField("pitId")); + PARSER.declareString(constructorArg(), new ParseField("pit_id")); } private static final ParseField SUCCESSFUL = new ParseField("successful"); - private static final ParseField PIT_ID = new ParseField("pitId"); + private static final ParseField PIT_ID = new ParseField("pit_id"); @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { diff --git a/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java b/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java index 945fcfd17eb6c..926e9c19a33f5 100644 --- a/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java +++ b/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java @@ -48,6 +48,11 @@ public DeletePitRequest(List pitIds) { this.pitIds.addAll(pitIds); } + public void clearAndSetPitIds(List pitIds) { + this.pitIds.clear(); + this.pitIds.addAll(pitIds); + } + public DeletePitRequest() {} public List getPitIds() { diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java index b4ad2f6641087..340f9b842adbf 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesRequest.java @@ -21,11 +21,22 @@ */ public class GetAllPitNodesRequest extends BaseNodesRequest { + // Security plugin intercepts and sets the response with permitted PIT contexts + private GetAllPitNodesResponse getAllPitNodesResponse; + @Inject public GetAllPitNodesRequest(DiscoveryNode... concreteNodes) { super(concreteNodes); } + public void setGetAllPitNodesResponse(GetAllPitNodesResponse getAllPitNodesResponse) { + this.getAllPitNodesResponse = getAllPitNodesResponse; + } + + public GetAllPitNodesResponse getGetAllPitNodesResponse() { + return getAllPitNodesResponse; + } + public GetAllPitNodesRequest(StreamInput in) throws IOException { super(in); } diff --git a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java index 4a454e7145eff..091447798cf5f 100644 --- a/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java +++ b/server/src/main/java/org/opensearch/action/search/GetAllPitNodesResponse.java @@ -52,6 +52,14 @@ public GetAllPitNodesResponse( ); } + /** + * Copy constructor that explicitly sets the list pit infos + */ + public GetAllPitNodesResponse(List listPitInfos, GetAllPitNodesResponse response) { + super(response.getClusterName(), response.getNodes(), response.failures()); + pitInfos.addAll(listPitInfos); + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); diff --git a/server/src/main/java/org/opensearch/action/search/NodesGetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/NodesGetAllPitsAction.java new file mode 100644 index 0000000000000..af41f7d49551c --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/NodesGetAllPitsAction.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.ActionType; + +/** + * Action type for retrieving all PIT reader contexts from nodes + */ +public class NodesGetAllPitsAction extends ActionType { + public static final NodesGetAllPitsAction INSTANCE = new NodesGetAllPitsAction(); + public static final String NAME = "cluster:admin/point_in_time/read_from_nodes"; + + private NodesGetAllPitsAction() { + super(NAME, GetAllPitNodesResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/search/PitService.java b/server/src/main/java/org/opensearch/action/search/PitService.java index 0b79b77fd6014..ff068397ad94e 100644 --- a/server/src/main/java/org/opensearch/action/search/PitService.java +++ b/server/src/main/java/org/opensearch/action/search/PitService.java @@ -15,6 +15,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; import org.opensearch.action.support.GroupedActionListener; +import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Strings; @@ -47,12 +48,19 @@ public class PitService { private final ClusterService clusterService; private final SearchTransportService searchTransportService; private final TransportService transportService; + private final NodeClient nodeClient; @Inject - public PitService(ClusterService clusterService, SearchTransportService searchTransportService, TransportService transportService) { + public PitService( + ClusterService clusterService, + SearchTransportService searchTransportService, + TransportService transportService, + NodeClient nodeClient + ) { this.clusterService = clusterService; this.searchTransportService = searchTransportService; this.transportService = transportService; + this.nodeClient = nodeClient; } /** @@ -144,6 +152,17 @@ public void onFailure(final Exception e) { }, size); } + /** + * This method returns indices associated for each pit + */ + public Map getIndicesForPits(List pitIds) { + Map pitToIndicesMap = new HashMap<>(); + for (String pitId : pitIds) { + pitToIndicesMap.put(pitId, SearchContextId.decode(nodeClient.getNamedWriteableRegistry(), pitId).getActualIndices()); + } + return pitToIndicesMap; + } + /** * Get all active point in time contexts */ @@ -156,7 +175,7 @@ public void getAllPits(ActionListener getAllPitsListener DiscoveryNode[] disNodesArr = nodes.toArray(new DiscoveryNode[nodes.size()]); transportService.sendRequest( transportService.getLocalNode(), - GetAllPitsAction.NAME, + NodesGetAllPitsAction.NAME, new GetAllPitNodesRequest(disNodesArr), new TransportResponseHandler() { diff --git a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java index f9e36c479dd54..19abe2361290d 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java @@ -57,7 +57,11 @@ public TransportDeletePitAction( @Override protected void doExecute(Task task, DeletePitRequest request, ActionListener listener) { List pitIds = request.getPitIds(); - if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) { + // when security plugin intercepts the request, if PITs are not present in the cluster the PIT IDs in request will be empty + // and in this case return empty response + if (pitIds.isEmpty()) { + listener.onResponse(new DeletePitResponse(new ArrayList<>())); + } else if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) { deleteAllPits(listener); } else { deletePits(listener, request); diff --git a/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java index 21a64e388fa7b..c8529c5b02bd4 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportGetAllPitsAction.java @@ -8,79 +8,31 @@ package org.opensearch.action.search; -import org.opensearch.action.FailedNodeException; +import org.opensearch.action.ActionListener; import org.opensearch.action.support.ActionFilters; -import org.opensearch.action.support.nodes.TransportNodesAction; -import org.opensearch.cluster.service.ClusterService; +import org.opensearch.action.support.HandledTransportAction; import org.opensearch.common.inject.Inject; -import org.opensearch.common.io.stream.StreamInput; -import org.opensearch.search.SearchService; -import org.opensearch.threadpool.ThreadPool; +import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; -import java.io.IOException; -import java.util.List; - /** - * Transport action to get all active PIT contexts across all nodes + * Transport action to get all active PIT contexts across the cluster */ -public class TransportGetAllPitsAction extends TransportNodesAction< - GetAllPitNodesRequest, - GetAllPitNodesResponse, - GetAllPitNodeRequest, - GetAllPitNodeResponse> { - private final SearchService searchService; +public class TransportGetAllPitsAction extends HandledTransportAction { + private final PitService pitService; @Inject - public TransportGetAllPitsAction( - ThreadPool threadPool, - ClusterService clusterService, - TransportService transportService, - ActionFilters actionFilters, - SearchService searchService - ) { - super( - GetAllPitsAction.NAME, - threadPool, - clusterService, - transportService, - actionFilters, - GetAllPitNodesRequest::new, - GetAllPitNodeRequest::new, - ThreadPool.Names.SAME, - GetAllPitNodeResponse.class - ); - this.searchService = searchService; - } - - @Override - protected GetAllPitNodesResponse newResponse( - GetAllPitNodesRequest request, - List getAllPitNodeRespons, - List failures - ) { - return new GetAllPitNodesResponse(clusterService.getClusterName(), getAllPitNodeRespons, failures); - } - - @Override - protected GetAllPitNodeRequest newNodeRequest(GetAllPitNodesRequest request) { - return new GetAllPitNodeRequest(); - } - - @Override - protected GetAllPitNodeResponse newNodeResponse(StreamInput in) throws IOException { - return new GetAllPitNodeResponse(in); + public TransportGetAllPitsAction(ActionFilters actionFilters, TransportService transportService, PitService pitService) { + super(GetAllPitsAction.NAME, transportService, actionFilters, in -> new GetAllPitNodesRequest(in)); + this.pitService = pitService; } - /** - * This retrieves all active PITs in the node - */ - @Override - protected GetAllPitNodeResponse nodeOperation(GetAllPitNodeRequest request) { - GetAllPitNodeResponse nodeResponse = new GetAllPitNodeResponse( - transportService.getLocalNode(), - searchService.getAllPITReaderContexts() - ); - return nodeResponse; + protected void doExecute(Task task, GetAllPitNodesRequest request, ActionListener listener) { + // If security plugin intercepts the request, it'll replace all PIT IDs with permitted PIT IDs + if (request.getGetAllPitNodesResponse() != null) { + listener.onResponse(request.getGetAllPitNodesResponse()); + } else { + pitService.getAllPits(listener); + } } } diff --git a/server/src/main/java/org/opensearch/action/search/TransportNodesGetAllPitsAction.java b/server/src/main/java/org/opensearch/action/search/TransportNodesGetAllPitsAction.java new file mode 100644 index 0000000000000..520830cd293f0 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/search/TransportNodesGetAllPitsAction.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.action.FailedNodeException; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.nodes.TransportNodesAction; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.search.SearchService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; +import java.util.List; + +/** + * Transport action to get all active PIT contexts across all nodes + */ +public class TransportNodesGetAllPitsAction extends TransportNodesAction< + GetAllPitNodesRequest, + GetAllPitNodesResponse, + GetAllPitNodeRequest, + GetAllPitNodeResponse> { + private final SearchService searchService; + + @Inject + public TransportNodesGetAllPitsAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + SearchService searchService + ) { + super( + NodesGetAllPitsAction.NAME, + threadPool, + clusterService, + transportService, + actionFilters, + GetAllPitNodesRequest::new, + GetAllPitNodeRequest::new, + ThreadPool.Names.SAME, + GetAllPitNodeResponse.class + ); + this.searchService = searchService; + } + + @Override + protected GetAllPitNodesResponse newResponse( + GetAllPitNodesRequest request, + List getAllPitNodeRespons, + List failures + ) { + return new GetAllPitNodesResponse(clusterService.getClusterName(), getAllPitNodeRespons, failures); + } + + @Override + protected GetAllPitNodeRequest newNodeRequest(GetAllPitNodesRequest request) { + return new GetAllPitNodeRequest(); + } + + @Override + protected GetAllPitNodeResponse newNodeResponse(StreamInput in) throws IOException { + return new GetAllPitNodeResponse(in); + } + + /** + * This retrieves all active PITs in the node + */ + @Override + protected GetAllPitNodeResponse nodeOperation(GetAllPitNodeRequest request) { + GetAllPitNodeResponse nodeResponse = new GetAllPitNodeResponse( + transportService.getLocalNode(), + searchService.getAllPITReaderContexts() + ); + return nodeResponse; + } +} diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestCreatePitAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePitAction.java new file mode 100644 index 0000000000000..9439670880015 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/search/RestCreatePitAction.java @@ -0,0 +1,57 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.search; + +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.support.IndicesOptions; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.Strings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestStatusToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.POST; + +/** + * Rest action for creating PIT context + */ +public class RestCreatePitAction extends BaseRestHandler { + public static String ALLOW_PARTIAL_PIT_CREATION = "allow_partial_pit_creation"; + public static String KEEP_ALIVE = "keep_alive"; + + @Override + public String getName() { + return "create_pit_action"; + } + + @Override + public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + boolean allowPartialPitCreation = request.paramAsBoolean(ALLOW_PARTIAL_PIT_CREATION, true); + String[] indices = Strings.splitStringByCommaToArray(request.param("index")); + TimeValue keepAlive = request.paramAsTime(KEEP_ALIVE, null); + CreatePitRequest createPitRequest = new CreatePitRequest(keepAlive, allowPartialPitCreation, indices); + createPitRequest.setIndicesOptions(IndicesOptions.fromRequest(request, createPitRequest.indicesOptions())); + createPitRequest.setPreference(request.param("preference")); + createPitRequest.setRouting(request.param("routing")); + + return channel -> client.createPit(createPitRequest, new RestStatusToXContentListener<>(channel)); + } + + @Override + public List routes() { + return unmodifiableList(asList(new Route(POST, "/{index}/_search/point_in_time"))); + } + +} diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestDeletePitAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestDeletePitAction.java new file mode 100644 index 0000000000000..452e66f8f5018 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/search/RestDeletePitAction.java @@ -0,0 +1,60 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.search; + +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestStatusToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static java.util.Arrays.asList; +import static java.util.Collections.unmodifiableList; +import static org.opensearch.rest.RestRequest.Method.DELETE; + +/** + * Rest action for deleting PIT contexts + */ +public class RestDeletePitAction extends BaseRestHandler { + + @Override + public String getName() { + return "delete_pit_action"; + } + + @Override + public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String allPitIdsQualifier = "_all"; + final DeletePitRequest deletePITRequest; + if (request.path().contains(allPitIdsQualifier)) { + deletePITRequest = new DeletePitRequest(asList(allPitIdsQualifier)); + } else { + deletePITRequest = new DeletePitRequest(); + request.withContentOrSourceParamParserOrNull((xContentParser -> { + if (xContentParser != null) { + try { + deletePITRequest.fromXContent(xContentParser); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to parse request body", e); + } + } + })); + } + return channel -> client.deletePits(deletePITRequest, new RestStatusToXContentListener(channel)); + } + + @Override + public List routes() { + return unmodifiableList(asList(new Route(DELETE, "/_search/point_in_time"), new Route(DELETE, "/_search/point_in_time/_all"))); + } +} diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 4bd95da193668..04fab85c163a9 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -881,6 +881,7 @@ public void createPitReaderContext(ShardId shardId, TimeValue keepAlive, ActionL shard.awaitShardSearchActive(ignored -> { Engine.SearcherSupplier searcherSupplier = null; ReaderContext readerContext = null; + Releasable decreasePitContexts = openPitContexts::decrementAndGet; try { if (openPitContexts.incrementAndGet() > maxOpenPitContext) { throw new OpenSearchRejectedExecutionException( @@ -902,15 +903,16 @@ public void createPitReaderContext(ShardId shardId, TimeValue keepAlive, ActionL searchOperationListener.onNewPitContext(finalReaderContext); readerContext.addOnClose(() -> { - openPitContexts.decrementAndGet(); searchOperationListener.onFreeReaderContext(finalReaderContext); searchOperationListener.onFreePitContext(finalReaderContext); }); + readerContext.addOnClose(decreasePitContexts); // add the newly created pit reader context to active readers putReaderContext(readerContext); readerContext = null; listener.onResponse(finalReaderContext.id()); } catch (Exception exc) { + Releasables.closeWhileHandlingException(decreasePitContexts); Releasables.closeWhileHandlingException(searcherSupplier, readerContext); listener.onFailure(exc); } diff --git a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java index a5c6e1c12b79c..c03c27f7d7e4d 100644 --- a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java +++ b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java @@ -14,6 +14,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.LatchedActionListener; import org.opensearch.action.StepListener; +import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.node.DiscoveryNode; @@ -70,6 +71,8 @@ public class CreatePitControllerTests extends OpenSearchTestCase { ClusterService clusterServiceMock = null; private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); + Settings settings = Settings.builder().put("node.name", CreatePitControllerTests.class.getSimpleName()).build(); + NodeClient client = new NodeClient(settings, threadPool); @Override public void tearDown() throws Exception { @@ -219,7 +222,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService, client); CreatePitController controller = new CreatePitController( searchTransportService, clusterServiceMock, @@ -308,7 +311,7 @@ public void sendFreePITContexts( CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService, client); CreatePitController controller = new CreatePitController( searchTransportService, clusterServiceMock, @@ -406,7 +409,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService, client); CreatePitController controller = new CreatePitController( searchTransportService, clusterServiceMock, @@ -494,7 +497,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod }; CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); request.setIndices(new String[] { "index" }); - PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService, client); CreatePitController controller = new CreatePitController( searchTransportService, clusterServiceMock, diff --git a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java index 7a1d9a6fe963c..bdc0440a89f69 100644 --- a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java @@ -13,6 +13,7 @@ import org.opensearch.action.support.ActionFilter; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.PlainActionFuture; +import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.Metadata; @@ -62,6 +63,7 @@ public class TransportDeletePitActionTests extends OpenSearchTestCase { ClusterService clusterServiceMock = null; Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); private ThreadPool threadPool = new ThreadPool(settings); + NodeClient client = new NodeClient(settings, threadPool); @Override public void tearDown() throws Exception { @@ -165,7 +167,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService, client); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -229,7 +231,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService) { + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService, client) { @Override public void getAllPits(ActionListener getAllPitsListener) { ListPitInfo listPitInfo = new ListPitInfo(getPitId(), 0, 0); @@ -312,7 +314,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService, client); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -371,7 +373,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService, client); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -439,7 +441,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService); + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService, client); TransportDeletePitAction action = new TransportDeletePitAction( transportService, actionFilters, @@ -505,7 +507,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService) { + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService, client) { @Override public void getAllPits(ActionListener getAllPitsListener) { ListPitInfo listPitInfo = new ListPitInfo(getPitId(), 0, 0); @@ -581,7 +583,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService) { + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService, client) { @Override public void getAllPits(ActionListener getAllPitsListener) { ListPitInfo listPitInfo = new ListPitInfo(getPitId(), 0, 0); @@ -661,7 +663,7 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } }; - PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService) { + PitService pitService = new PitService(clusterServiceMock, searchTransportService, transportService, client) { @Override public void getAllPits(ActionListener getAllPitsListener) { ListPitInfo listPitInfo = new ListPitInfo(getPitId(), 0, 0); diff --git a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java index a10f004b2ee97..9a28f1800847e 100644 --- a/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java +++ b/server/src/test/java/org/opensearch/search/CreatePitSingleNodeTests.java @@ -14,6 +14,10 @@ import org.opensearch.action.search.CreatePitController; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.DeletePitAction; +import org.opensearch.action.search.DeletePitInfo; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.DeletePitResponse; import org.opensearch.action.search.PitTestsUtil; import org.opensearch.action.search.SearchPhaseExecutionException; import org.opensearch.action.search.SearchResponse; @@ -33,6 +37,8 @@ import java.util.concurrent.ExecutionException; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.blankOrNullString; +import static org.hamcrest.Matchers.not; import static org.opensearch.action.search.PitTestsUtil.assertSegments; import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; @@ -282,6 +288,52 @@ public void testMaxOpenPitContexts() throws Exception { validatePitStats("index", 0, maxPitContexts, 0); } + public void testCreatePitMoreThanMaxOpenPitContexts() throws Exception { + createIndex("index"); + client().prepareIndex("index").setId("1").setSource("field", "value").setRefreshPolicy(IMMEDIATE).get(); + + CreatePitRequest request = new CreatePitRequest(TimeValue.timeValueDays(1), true); + request.setIndices(new String[] { "index" }); + SearchService service = getInstanceFromNode(SearchService.class); + + try { + for (int i = 0; i < 1000; i++) { + client().execute(CreatePitAction.INSTANCE, request).get(); + } + } catch (Exception ex) { + assertTrue( + ex.getMessage() + .contains( + "Trying to create too many Point In Time contexts. " + + "Must be less than or equal to: [" + + SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY) + + "]. " + + "This limit can be set by changing the [search.max_open_pit_context] setting." + ) + ); + } + final int maxPitContexts = SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY); + validatePitStats("index", maxPitContexts, 0, 0); + // deleteall + DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); + + /** + * When we invoke delete again, returns success after clearing the remaining readers. Asserting reader context + * not found exceptions don't result in failures ( as deletion in one node is successful ) + */ + ActionFuture execute = client().execute(DeletePitAction.INSTANCE, deletePITRequest); + DeletePitResponse deletePITResponse = execute.get(); + for (DeletePitInfo deletePitInfo : deletePITResponse.getDeletePitResults()) { + assertThat(deletePitInfo.getPitId(), not(blankOrNullString())); + assertTrue(deletePitInfo.isSuccessful()); + } + validatePitStats("index", 0, maxPitContexts, 0); + client().execute(CreatePitAction.INSTANCE, request).get(); + validatePitStats("index", 1, maxPitContexts, 0); + service.doClose(); + validatePitStats("index", 0, maxPitContexts + 1, 0); + } + public void testOpenPitContextsConcurrently() throws Exception { createIndex("index"); final int maxPitContexts = SearchService.MAX_OPEN_PIT_CONTEXT.get(Settings.EMPTY); diff --git a/server/src/test/java/org/opensearch/search/pit/RestCreatePitActionTests.java b/server/src/test/java/org/opensearch/search/pit/RestCreatePitActionTests.java new file mode 100644 index 0000000000000..5ca384daedbff --- /dev/null +++ b/server/src/test/java/org/opensearch/search/pit/RestCreatePitActionTests.java @@ -0,0 +1,78 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pit; + +import org.apache.lucene.util.SetOnce; +import org.opensearch.action.ActionListener; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.search.RestCreatePitAction; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.client.NoOpNodeClient; +import org.opensearch.test.rest.FakeRestChannel; +import org.opensearch.test.rest.FakeRestRequest; + +import java.util.HashMap; +import java.util.Map; + +import static org.hamcrest.Matchers.equalTo; + +/** + * Tests to verify behavior of create pit rest action + */ +public class RestCreatePitActionTests extends OpenSearchTestCase { + public void testRestCreatePit() throws Exception { + SetOnce createPitCalled = new SetOnce<>(); + RestCreatePitAction action = new RestCreatePitAction(); + try (NodeClient nodeClient = new NoOpNodeClient(this.getTestName()) { + @Override + public void createPit(CreatePitRequest request, ActionListener listener) { + createPitCalled.set(true); + assertThat(request.getKeepAlive().getStringRep(), equalTo("1m")); + assertFalse(request.shouldAllowPartialPitCreation()); + } + }) { + Map params = new HashMap<>(); + params.put("keep_alive", "1m"); + params.put("allow_partial_pit_creation", "false"); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withParams(params) + .withMethod(RestRequest.Method.POST) + .build(); + FakeRestChannel channel = new FakeRestChannel(request, false, 0); + action.handleRequest(request, channel, nodeClient); + + assertThat(createPitCalled.get(), equalTo(true)); + } + } + + public void testRestCreatePitDefaultPartialCreation() throws Exception { + SetOnce createPitCalled = new SetOnce<>(); + RestCreatePitAction action = new RestCreatePitAction(); + try (NodeClient nodeClient = new NoOpNodeClient(this.getTestName()) { + @Override + public void createPit(CreatePitRequest request, ActionListener listener) { + createPitCalled.set(true); + assertThat(request.getKeepAlive().getStringRep(), equalTo("1m")); + assertTrue(request.shouldAllowPartialPitCreation()); + } + }) { + Map params = new HashMap<>(); + params.put("keep_alive", "1m"); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withParams(params) + .withMethod(RestRequest.Method.POST) + .build(); + FakeRestChannel channel = new FakeRestChannel(request, false, 0); + action.handleRequest(request, channel, nodeClient); + + assertThat(createPitCalled.get(), equalTo(true)); + } + } +} diff --git a/server/src/test/java/org/opensearch/search/pit/RestDeletePitActionTests.java b/server/src/test/java/org/opensearch/search/pit/RestDeletePitActionTests.java new file mode 100644 index 0000000000000..0bfa16aafe1e3 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/pit/RestDeletePitActionTests.java @@ -0,0 +1,133 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.pit; + +import org.apache.lucene.util.SetOnce; +import org.opensearch.action.ActionListener; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.bytes.BytesArray; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.search.RestDeletePitAction; +import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.client.NoOpNodeClient; +import org.opensearch.test.rest.FakeRestChannel; +import org.opensearch.test.rest.FakeRestRequest; + +import java.util.Collections; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; + +/** + * Tests to verify the behavior of rest delete pit action for list delete and delete all PIT endpoints + */ +public class RestDeletePitActionTests extends OpenSearchTestCase { + public void testParseDeletePitRequestWithInvalidJsonThrowsException() throws Exception { + RestDeletePitAction action = new RestDeletePitAction(); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent( + new BytesArray("{invalid_json}"), + XContentType.JSON + ).build(); + Exception e = expectThrows(IllegalArgumentException.class, () -> action.prepareRequest(request, null)); + assertThat(e.getMessage(), equalTo("Failed to parse request body")); + } + + public void testDeletePitWithBody() throws Exception { + SetOnce pitCalled = new SetOnce<>(); + try (NodeClient nodeClient = new NoOpNodeClient(this.getTestName()) { + @Override + public void deletePits(DeletePitRequest request, ActionListener listener) { + pitCalled.set(true); + assertThat(request.getPitIds(), hasSize(1)); + assertThat(request.getPitIds().get(0), equalTo("BODY")); + } + }) { + RestDeletePitAction action = new RestDeletePitAction(); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent( + new BytesArray("{\"pit_id\": [\"BODY\"]}"), + XContentType.JSON + ).build(); + FakeRestChannel channel = new FakeRestChannel(request, false, 0); + action.handleRequest(request, channel, nodeClient); + + assertThat(pitCalled.get(), equalTo(true)); + } + } + + public void testDeleteAllPit() throws Exception { + SetOnce pitCalled = new SetOnce<>(); + try (NodeClient nodeClient = new NoOpNodeClient(this.getTestName()) { + @Override + public void deletePits(DeletePitRequest request, ActionListener listener) { + pitCalled.set(true); + assertThat(request.getPitIds(), hasSize(1)); + assertThat(request.getPitIds().get(0), equalTo("_all")); + } + }) { + RestDeletePitAction action = new RestDeletePitAction(); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withPath("/_all").build(); + FakeRestChannel channel = new FakeRestChannel(request, false, 0); + action.handleRequest(request, channel, nodeClient); + + assertThat(pitCalled.get(), equalTo(true)); + } + } + + public void testDeleteAllPitWithBody() { + SetOnce pitCalled = new SetOnce<>(); + try (NodeClient nodeClient = new NoOpNodeClient(this.getTestName()) { + @Override + public void deletePits(DeletePitRequest request, ActionListener listener) { + pitCalled.set(true); + assertThat(request.getPitIds(), hasSize(1)); + assertThat(request.getPitIds().get(0), equalTo("_all")); + } + }) { + RestDeletePitAction action = new RestDeletePitAction(); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withContent( + new BytesArray("{\"pit_id\": [\"BODY\"]}"), + XContentType.JSON + ).withPath("/_all").build(); + FakeRestChannel channel = new FakeRestChannel(request, false, 0); + + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> action.handleRequest(request, channel, nodeClient) + ); + assertTrue(ex.getMessage().contains("request [GET /_all] does not support having a body")); + } + } + + public void testDeletePitQueryStringParamsShouldThrowException() { + SetOnce pitCalled = new SetOnce<>(); + try (NodeClient nodeClient = new NoOpNodeClient(this.getTestName()) { + @Override + public void deletePits(DeletePitRequest request, ActionListener listener) { + pitCalled.set(true); + assertThat(request.getPitIds(), hasSize(2)); + assertThat(request.getPitIds().get(0), equalTo("QUERY_STRING")); + assertThat(request.getPitIds().get(1), equalTo("QUERY_STRING_1")); + } + }) { + RestDeletePitAction action = new RestDeletePitAction(); + RestRequest request = new FakeRestRequest.Builder(xContentRegistry()).withParams( + Collections.singletonMap("pit_id", "QUERY_STRING,QUERY_STRING_1") + ).build(); + FakeRestChannel channel = new FakeRestChannel(request, false, 0); + IllegalArgumentException ex = expectThrows( + IllegalArgumentException.class, + () -> action.handleRequest(request, channel, nodeClient) + ); + assertTrue(ex.getMessage().contains("unrecognized param")); + } + } +}