Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete PIT changes #3401

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchScrollRequest;
Expand Down Expand Up @@ -477,6 +478,17 @@ static Request createPit(CreatePitRequest createPitRequest) throws IOException {
return request;
}

static Request deletePit(DeletePitRequest deletePitRequest) throws IOException {
Request request = new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time");
request.setEntity(createEntity(deletePitRequest, REQUEST_BODY_CONTENT_TYPE));
return request;
}

static Request deleteAllPits(DeletePitRequest deletePitRequest) {
Request request = new Request(HttpDelete.METHOD_NAME, "/_search/point_in_time/_all");
return request;
}

static Request clearScroll(ClearScrollRequest clearScrollRequest) throws IOException {
Request request = new Request(HttpDelete.METHOD_NAME, "/_search/scroll");
request.setEntity(createEntity(clearScrollRequest, REQUEST_BODY_CONTENT_TYPE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
Expand Down Expand Up @@ -1298,6 +1300,86 @@ public final Cancellable createPitAsync(
);
}

/**
* Delete point in time searches using delete PIT API
*
* @param deletePitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final DeletePitResponse deletePit(DeletePitRequest deletePitRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
deletePitRequest,
RequestConverters::deletePit,
options,
DeletePitResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously Delete point in time searches using delete PIT API
*
* @param deletePitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable deletePitAsync(
DeletePitRequest deletePitRequest,
RequestOptions options,
ActionListener<DeletePitResponse> listener
) {
return performRequestAsyncAndParseEntity(
deletePitRequest,
RequestConverters::deletePit,
options,
DeletePitResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Delete all point in time searches using delete all PITs API
*
* @param deletePitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
*/
public final DeletePitResponse deleteAllPits(DeletePitRequest deletePitRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(
deletePitRequest,
RequestConverters::deleteAllPits,
options,
DeletePitResponse::fromXContent,
emptySet()
);
}

/**
* Asynchronously Delete all point in time searches using delete all PITs API
*
* @param deletePitRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
* @return the response
*/
public final Cancellable deleteAllPitsAsync(
DeletePitRequest deletePitRequest,
RequestOptions options,
ActionListener<DeletePitResponse> listener
) {
return performRequestAsyncAndParseEntity(
deletePitRequest,
RequestConverters::deleteAllPits,
options,
DeletePitResponse::fromXContent,
listener,
emptySet()
);
}

/**
* Clears one or more scroll ids using the Clear Scroll API.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@
import org.junit.Before;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitInfo;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.common.unit.TimeValue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -51,8 +56,30 @@ public void testCreatePit() throws IOException {
assertEquals(1, pitResponse.getSuccessfulShards());
assertEquals(0, pitResponse.getFailedShards());
assertEquals(0, pitResponse.getSkippedShards());
List<String> pitIds = new ArrayList<>();
pitIds.add(pitResponse.getId());
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
DeletePitResponse deletePitResponse = execute(deletePitRequest, highLevelClient()::deletePit, highLevelClient()::deletePitAsync);
assertTrue(deletePitResponse.getDeletePitResults().get(0).isSucceeded());
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId()));
}

public void testDeleteAllPits() throws IOException {
CreatePitRequest pitRequest = new CreatePitRequest(new TimeValue(1, TimeUnit.DAYS), true, "index");
CreatePitResponse pitResponse = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
CreatePitResponse pitResponse1 = execute(pitRequest, highLevelClient()::createPit, highLevelClient()::createPitAsync);
assertTrue(pitResponse.getId() != null);
assertTrue(pitResponse1.getId() != null);
List<String> pitIds = new ArrayList<>();
pitIds.add("_all");
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
DeletePitResponse deletePitResponse = execute(
deletePitRequest,
highLevelClient()::deleteAllPits,
highLevelClient()::deleteAllPitsAsync
);
for (DeletePitInfo deletePitInfo : deletePitResponse.getDeletePitResults()) {
assertTrue(deletePitInfo.isSucceeded());
}
}
/**
* Todo: add deletion logic and test cluster settings
*/
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchScrollRequest;
Expand Down Expand Up @@ -1326,6 +1327,27 @@ public void testCreatePit() throws IOException {
assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
}

public void testDeletePit() throws IOException {
List<String> pitIds = new ArrayList<>();
pitIds.add("pitid1");
pitIds.add("pitid2");
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
Request request = RequestConverters.deletePit(deletePitRequest);
String endpoint = "/_search/point_in_time";
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals(endpoint, request.getEndpoint());
assertToXContentBody(deletePitRequest, request.getEntity());
assertEquals(REQUEST_BODY_CONTENT_TYPE.mediaTypeWithoutParameters(), request.getEntity().getContentType().getValue());
}

public void testDeleteAllPits() {
DeletePitRequest deletePitRequest = new DeletePitRequest();
Request request = RequestConverters.deleteAllPits(deletePitRequest);
String endpoint = "/_search/point_in_time/_all";
assertEquals(HttpDelete.METHOD_NAME, request.getMethod());
assertEquals(endpoint, request.getEndpoint());
}

public void testSearchTemplate() throws Exception {
// Create a random request.
String[] indices = randomIndicesNames(0, 5);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
Expand Down Expand Up @@ -103,6 +105,7 @@
import org.junit.Before;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -793,7 +796,16 @@ public void testSearchWithPit() throws Exception {
assertThat(((Number) hit.getSortValues()[0]).longValue(), equalTo(counter++));
}
} finally {
// TODO : Delete PIT
List<String> pitIds = new ArrayList<>();
pitIds.add(pitResponse.getId());
DeletePitRequest deletePitRequest = new DeletePitRequest(pitIds);
DeletePitResponse deletePitResponse = execute(
deletePitRequest,
highLevelClient()::deletePit,
highLevelClient()::deletePitAsync
);
assertTrue(deletePitResponse.getDeletePitResults().get(0).isSucceeded());
assertTrue(deletePitResponse.getDeletePitResults().get(0).getPitId().equals(pitResponse.getId()));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
"delete_all_pits":{
"documentation":{
"url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/",
"description":"Deletes all active point in time searches."
},
"stability":"stable",
"url":{
"paths":[
{
"path":"/_search/point_in_time/_all",
"methods":[
"DELETE"
]
}
]
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
{
"delete_pit":{
"documentation":{
"url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/",
"description":"Deletes one or more point in time searches based on the IDs passed."
},
"stability":"stable",
"url":{
"paths":[
{
"path":"/_search/point_in_time",
"methods":[
"DELETE"
]
}
]
},
"body":{
"description":"A comma-separated list of pit IDs to clear"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

body has one more filed called required. As I understand, body is required for this API.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,13 @@
import org.opensearch.action.main.TransportMainAction;
import org.opensearch.action.search.ClearScrollAction;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
Expand Down Expand Up @@ -399,6 +401,7 @@
import org.opensearch.rest.action.search.RestClearScrollAction;
import org.opensearch.rest.action.search.RestCountAction;
import org.opensearch.rest.action.search.RestCreatePitAction;
import org.opensearch.rest.action.search.RestDeletePitAction;
import org.opensearch.rest.action.search.RestExplainAction;
import org.opensearch.rest.action.search.RestMultiSearchAction;
import org.opensearch.rest.action.search.RestSearchAction;
Expand Down Expand Up @@ -660,6 +663,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(DeleteDanglingIndexAction.INSTANCE, TransportDeleteDanglingIndexAction.class);
actions.register(FindDanglingIndexAction.INSTANCE, TransportFindDanglingIndexAction.class);
actions.register(CreatePitAction.INSTANCE, TransportCreatePitAction.class);
actions.register(DeletePitAction.INSTANCE, TransportDeletePitAction.class);

return unmodifiableMap(actions.getRegistry());
}
Expand Down Expand Up @@ -835,6 +839,8 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {

// Point in time API
registerHandler.accept(new RestCreatePitAction());
registerHandler.accept(new RestDeletePitAction());

for (ActionPlugin plugin : actionPlugins) {
for (RestHandler handler : plugin.getRestHandlers(
settings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import org.opensearch.tasks.Task;
import org.opensearch.transport.Transport;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.BiFunction;
Expand Down Expand Up @@ -246,7 +249,7 @@ public void onResponse(final Collection<UpdatePitContextResponse> responses) {

@Override
public void onFailure(final Exception e) {
cleanupContexts(contexts);
cleanupContexts(contexts, createPITResponse.getId());
updatePitIdListener.onFailure(e);
}
}, size);
Expand All @@ -255,19 +258,32 @@ public void onFailure(final Exception e) {
/**
* Cleanup all created PIT contexts in case of failure
*/
private void cleanupContexts(Collection<SearchContextIdForNode> contexts) {
ActionListener<Integer> deleteListener = new ActionListener<>() {
private void cleanupContexts(Collection<SearchContextIdForNode> contexts, String pitId) {
ActionListener<DeletePitResponse> deleteListener = new ActionListener<>() {
@Override
public void onResponse(Integer freed) {
// log the number of freed contexts - this is invoke and forget call
logger.debug(() -> new ParameterizedMessage("Cleaned up {} contexts out of {}", freed, contexts.size()));
public void onResponse(DeletePitResponse response) {
// this is invoke and forget call
bharath-techie marked this conversation as resolved.
Show resolved Hide resolved
if (!logger.isDebugEnabled()) return;
for (DeletePitInfo deletePitInfo : response.getDeletePitResults()) {
if (!deletePitInfo.isSucceeded()) {
logger.debug(() -> new ParameterizedMessage("Failed to delete PIT ID {}", deletePitInfo.getPitId()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why debug, should this be warn instead?. Lets have a single log line with concatenated ids?

} else {
logger.debug(() -> new ParameterizedMessage("Deleted PIT with ID {}", deletePitInfo.getPitId()));
}
}
}

@Override
public void onFailure(Exception e) {
logger.error("Cleaning up PIT contexts failed ", e);
}
};
ClearScrollController.closeContexts(clusterService.state().getNodes(), searchTransportService, contexts, deleteListener);
Map<String, List<PitSearchContextIdForNode>> nodeToContextsMap = new HashMap<>();
for (SearchContextIdForNode context : contexts) {
List<PitSearchContextIdForNode> contextIdsForNode = nodeToContextsMap.getOrDefault(context.getNode(), new ArrayList<>());
contextIdsForNode.add(new PitSearchContextIdForNode(pitId, context));
nodeToContextsMap.put(context.getNode(), contextIdsForNode);
}
SearchUtils.deletePitContexts(nodeToContextsMap, deleteListener, clusterService.state(), searchTransportService);
}
}
Loading