Skip to content

Commit

Permalink
[Backport 2.x] [Point in time] Backport point in time changes (#4616)
Browse files Browse the repository at this point in the history
* Adding create pit service layer changes

Signed-off-by: Bharathwaj G <[email protected]>

* Adding delete pit service layer changes

Signed-off-by: Bharathwaj G <[email protected]>

* Add service layer changes for list all PITs

Signed-off-by: Bharathwaj G <[email protected]>

* Add Point In Time Node Stats API ServiceLayer Changes

Signed-off-by: Bharathwaj G <[email protected]>

* Add changes to Point in time segments API service layer

Signed-off-by: Bharathwaj G <[email protected]>

* Add changes for Create PIT and Delete PIT rest layer and rest high level client

Signed-off-by: Bharathwaj G <[email protected]>

* Added restlayer changes for pit stats

Signed-off-by: Bharathwaj G <[email protected]>

* 2.4 specific fixes

Signed-off-by: Bharathwaj G <[email protected]>

* Modified cat shards test for pit stats

Signed-off-by: Bharathwaj G <[email protected]>

* Added rest layer changes for List all PITs and PIT segments

Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie authored Oct 6, 2022
1 parent a12feea commit 2c0b4e7
Show file tree
Hide file tree
Showing 79 changed files with 8,097 additions and 31 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
### Added
- Add support for s390x architecture ([#4001](https://github.com/opensearch-project/OpenSearch/pull/4001))
- Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085))
- Point in time rest layer changes for create and delete PIT API ([#4064](https://github.com/opensearch-project/OpenSearch/pull/4064))
- Add failover support with Segment Replication enabled. ([#4325](https://github.com/opensearch-project/OpenSearch/pull/4325)
- Point in time rest layer changes for list PIT and PIT segments API ([#4388](https://github.com/opensearch-project/OpenSearch/pull/4388))
- Added @dreamer-89 as an Opensearch maintainer ([#4342](https://github.com/opensearch-project/OpenSearch/pull/4342))
- Added release notes for 1.3.5 ([#4343](https://github.com/opensearch-project/OpenSearch/pull/4343))
- Added release notes for 2.2.1 ([#4344](https://github.com/opensearch-project/OpenSearch/pull/4344))
Expand Down Expand Up @@ -46,6 +48,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Getting security exception due to access denied 'java.lang.RuntimePermission' 'accessDeclaredMembers' when trying to get snapshot with S3 IRSA ([#4469](https://github.com/opensearch-project/OpenSearch/pull/4469))
- Fixed flaky test `ResourceAwareTasksTests.testTaskIdPersistsInThreadContext` ([#4484](https://github.com/opensearch-project/OpenSearch/pull/4484))
- Fixed the ignore_malformed setting to also ignore objects ([#4494](https://github.com/opensearch-project/OpenSearch/pull/4494))
- Fixed the `_cat/shards/10_basic.yml` test cases fix.
- Updated jackson to 2.13.4 and snakeyml to 1.32 ([#4556](https://github.com/opensearch-project/OpenSearch/pull/4556))
- Fixed day of year defaulting for round up parser ([#4627](https://github.com/opensearch-project/OpenSearch/pull/4627))
- Fixed the SnapshotsInProgress error during index deletion ([#4570](https://github.com/opensearch-project/OpenSearch/pull/4570))
Expand All @@ -59,6 +62,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Github workflow for changelog verification ([#4085](https://github.com/opensearch-project/OpenSearch/pull/4085))
- Add timing data and more granular stages to SegmentReplicationState ([#4367](https://github.com/opensearch-project/OpenSearch/pull/4367))
- BWC version 2.2.2 ([#4385](https://github.com/opensearch-project/OpenSearch/pull/4385))
- Added RestLayer Changes for PIT stats ([#4217](https://github.com/opensearch-project/OpenSearch/pull/4217))
- BWC version 1.3.6 ([#4452](https://github.com/opensearch-project/OpenSearch/pull/4452))
- Bump current version to 2.4.0 on 2.x branch ([#4454](https://github.com/opensearch-project/OpenSearch/pull/4454))
- 2.3.0 release notes ([#4457](https://github.com/opensearch-project/OpenSearch/pull/4457))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchScrollRequest;
Expand Down Expand Up @@ -92,6 +94,7 @@
import org.opensearch.index.reindex.ReindexRequest;
import org.opensearch.index.reindex.UpdateByQueryRequest;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.rest.action.search.RestCreatePitAction;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.script.mustache.MultiSearchTemplateRequest;
import org.opensearch.script.mustache.SearchTemplateRequest;
Expand Down Expand Up @@ -433,9 +436,19 @@ static void addSearchRequestParams(Params params, SearchRequest searchRequest) {
params.putParam(RestSearchAction.TYPED_KEYS_PARAM, "true");
params.withRouting(searchRequest.routing());
params.withPreference(searchRequest.preference());
params.withIndicesOptions(searchRequest.indicesOptions());
if (searchRequest.pointInTimeBuilder() == null) {
params.withIndicesOptions(searchRequest.indicesOptions());
}
params.withSearchType(searchRequest.searchType().name().toLowerCase(Locale.ROOT));
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
/**
* Merging search responses as part of CCS flow to reduce roundtrips is not supported for point in time -
* refer to org.opensearch.action.search.SearchResponseMerger
*/
if (searchRequest.pointInTimeBuilder() != null) {
params.putParam("ccs_minimize_roundtrips", "false");
} else {
params.putParam("ccs_minimize_roundtrips", Boolean.toString(searchRequest.isCcsMinimizeRoundtrips()));
}
if (searchRequest.getPreFilterShardSize() != null) {
params.putParam("pre_filter_shard_size", Integer.toString(searchRequest.getPreFilterShardSize()));
}
Expand Down Expand Up @@ -464,6 +477,31 @@ static Request clearScroll(ClearScrollRequest clearScrollRequest) throws IOExcep
return request;
}

static Request createPit(CreatePitRequest createPitRequest) throws IOException {
Params params = new Params();
params.putParam(RestCreatePitAction.ALLOW_PARTIAL_PIT_CREATION, Boolean.toString(createPitRequest.shouldAllowPartialPitCreation()));
params.putParam(RestCreatePitAction.KEEP_ALIVE, createPitRequest.getKeepAlive());
params.withIndicesOptions(createPitRequest.indicesOptions());
Request request = new Request(HttpPost.METHOD_NAME, endpoint(createPitRequest.indices(), "_search/point_in_time"));
request.addParameters(params.asMap());
request.setEntity(createEntity(createPitRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request deletePit(DeletePitRequest deletePitRequest) throws IOException {
Request request = new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time");
request.setEntity(createEntity(deletePitRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request deleteAllPits() {
return new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time/_all");
}

static Request getAllPits() {
return new Request(HttpGet.METHOD_NAME, "/_search/point_in_time/_all");
}

static Request multiSearch(MultiSearchRequest multiSearchRequest) throws IOException {
Request request = new Request(HttpPost.METHOD_NAME, "/_msearch");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.GetAllPitNodesResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
Expand Down Expand Up @@ -1250,6 +1255,154 @@ public final Cancellable scrollAsync(
);
}

/**
* Create PIT context using create PIT API
*
* @param createPitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final CreatePitResponse createPit(CreatePitRequest createPitRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
createPitRequest,
RequestConverters::createPit,
options,
CreatePitResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously Create PIT context using create PIT API
*
* @param createPitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable createPitAsync(
CreatePitRequest createPitRequest,
RequestOptions options,
ActionListener<CreatePitResponse> listener
) {
return performRequestAsyncAndParseEntity(
createPitRequest,
RequestConverters::createPit,
options,
CreatePitResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Delete point in time searches using delete PIT API
*
* @param deletePitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final DeletePitResponse deletePit(DeletePitRequest deletePitRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
deletePitRequest,
RequestConverters::deletePit,
options,
DeletePitResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously Delete point in time searches using delete PIT API
*
* @param deletePitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable deletePitAsync(
DeletePitRequest deletePitRequest,
RequestOptions options,
ActionListener<DeletePitResponse> listener
) {
return performRequestAsyncAndParseEntity(
deletePitRequest,
RequestConverters::deletePit,
options,
DeletePitResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Delete all point in time searches using delete all PITs API
*
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final DeletePitResponse deleteAllPits(RequestOptions options) throws IOException {
return performRequestAndParseEntity(
new MainRequest(),
(request) -> RequestConverters.deleteAllPits(),
options,
DeletePitResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously Delete all point in time searches using delete all PITs API
*
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable deleteAllPitsAsync(RequestOptions options, ActionListener<DeletePitResponse> listener) {
return performRequestAsyncAndParseEntity(
new MainRequest(),
(request) -> RequestConverters.deleteAllPits(),
options,
DeletePitResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Get all point in time searches using list all PITs API
*
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final GetAllPitNodesResponse getAllPits(RequestOptions options) throws IOException {
return performRequestAndParseEntity(
new MainRequest(),
(request) -> RequestConverters.getAllPits(),
options,
GetAllPitNodesResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously get all point in time searches using list all PITs API
*
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable getAllPitsAsync(RequestOptions options, ActionListener<GetAllPitNodesResponse> listener) {
return performRequestAsyncAndParseEntity(
new MainRequest(),
(request) -> RequestConverters.getAllPits(),
options,
GetAllPitNodesResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Clears one or more scroll ids using the Clear Scroll API.
*
Expand Down
131 changes: 131 additions & 0 deletions client/rest-high-level/src/test/java/org/opensearch/client/PitIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.client;

import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.junit.Before;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitInfo;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.GetAllPitNodesResponse;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
* Tests point in time API with rest high level client
*/
public class PitIT extends OpenSearchRestHighLevelClientTestCase {

@Before
public void indexDocuments() throws IOException {
Request doc1 = new Request(HttpPut.METHOD_NAME, "/index/_doc/1");
doc1.setJsonEntity("{\"type\":\"type1\", \"id\":1, \"num\":10, \"num2\":50}");
client().performRequest(doc1);
Request doc2 = new Request(HttpPut.METHOD_NAME, "/index/_doc/2");
doc2.setJsonEntity("{\"type\":\"type1\", \"id\":2, \"num\":20, \"num2\":40}");
client().performRequest(doc2);
Request doc3 = new Request(HttpPut.METHOD_NAME, "/index/_doc/3");
doc3.setJsonEntity("{\"type\":\"type1\", \"id\":3, \"num\":50, \"num2\":35}");
client().performRequest(doc3);
Request doc4 = new Request(HttpPut.METHOD_NAME, "/index/_doc/4");
doc4.setJsonEntity("{\"type\":\"type2\", \"id\":4, \"num\":100, \"num2\":10}");
client().performRequest(doc4);
Request doc5 = new Request(HttpPut.METHOD_NAME, "/index/_doc/5");
doc5.setJsonEntity("{\"type\":\"type2\", \"id\":5, \"num\":100, \"num2\":10}");
client().performRequest(doc5);
client().performRequest(new Request(HttpPost.METHOD_NAME, "/_refresh"));
}

public void testCreateAndDeletePit() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse createPitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(createPitResponse.getId() != null);
assertEquals(1, createPitResponse.getTotalShards());
assertEquals(1, createPitResponse.getSuccessfulShards());
assertEquals(0, createPitResponse.getFailedShards());
assertEquals(0, createPitResponse.getSkippedShards());
GetAllPitNodesResponse getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);
List<String> pits = getAllPitResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
assertTrue(pits.contains(createPitResponse.getId()));
List<String> pitIds = new ArrayList<>();
pitIds.add(createPitResponse.getId());
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
DeletePitResponse deletePitResponse = execute(deletePitRequest, highLevelClient()::deletePit, highLevelClient()::deletePitAsync);
assertTrue(deletePitResponse.getDeletePitResults().get(0).isSuccessful());
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(createPitResponse.getId()));
}

public void testDeleteAllAndListAllPits() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
CreatePitResponse pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertTrue(pitResponse1.getId() != null);
DeletePitResponse deletePitResponse = highLevelClient().deleteAllPits(RequestOptions.DEFAULT);
for (DeletePitInfo deletePitInfo : deletePitResponse.getDeletePitResults()) {
assertTrue(deletePitInfo.isSuccessful());
}
pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertTrue(pitResponse1.getId() != null);
GetAllPitNodesResponse getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);

List<String> pits = getAllPitResponse.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
assertTrue(pits.contains(pitResponse.getId()));
assertTrue(pits.contains(pitResponse1.getId()));
ActionListener<DeletePitResponse> deletePitListener = new ActionListener<>() {
@Override
public void onResponse(DeletePitResponse response) {
for (DeletePitInfo deletePitInfo : response.getDeletePitResults()) {
assertTrue(deletePitInfo.isSuccessful());
}
}

@Override
public void onFailure(Exception e) {
if (!(e instanceof OpenSearchStatusException)) {
throw new AssertionError("Delete all failed");
}
}
};
final CreatePitResponse pitResponse3 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);

ActionListener<GetAllPitNodesResponse> getPitsListener = new ActionListener<GetAllPitNodesResponse>() {
@Override
public void onResponse(GetAllPitNodesResponse response) {
List<String> pits = response.getPitInfos().stream().map(r -> r.getPitId()).collect(Collectors.toList());
assertTrue(pits.contains(pitResponse3.getId()));
}

@Override
public void onFailure(Exception e) {
if (!(e instanceof OpenSearchStatusException)) {
throw new AssertionError("List all PITs failed", e);
}
}
};
highLevelClient().getAllPitsAsync(RequestOptions.DEFAULT, getPitsListener);
highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener);
// validate no pits case
getAllPitResponse = highLevelClient().getAllPits(RequestOptions.DEFAULT);
assertTrue(getAllPitResponse.getPitInfos().size() == 0);
highLevelClient().deleteAllPitsAsync(RequestOptions.DEFAULT, deletePitListener);
}
}
Loading

0 comments on commit 2c0b4e7

Please sign in to comment.